1use crate::queue::priority::Priority;
4use dashmap::DashMap;
5use rusmes_proto::{Mail, MailId};
6use rusmes_storage::StorageBackend;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, RwLock};
12use std::time::{Duration, SystemTime};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueueEntryData {
17 pub mail_id: MailId,
18 pub attempts: u32,
19 pub max_attempts: u32,
20 #[serde(with = "systemtime_serde")]
21 pub next_retry: SystemTime,
22 pub last_error: Option<String>,
23 #[serde(default)]
24 pub priority: Priority,
25}
26
27#[derive(Debug, Clone)]
29pub struct QueueEntry {
30 pub mail: Mail,
31 pub attempts: u32,
32 pub max_attempts: u32,
33 pub next_retry: SystemTime,
34 pub last_error: Option<String>,
35 pub priority: Priority,
36}
37
38mod systemtime_serde {
40 use serde::{Deserialize, Deserializer, Serialize, Serializer};
41 use std::time::{Duration, SystemTime, UNIX_EPOCH};
42
43 pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
44 where
45 S: Serializer,
46 {
47 let duration = time
48 .duration_since(UNIX_EPOCH)
49 .map_err(serde::ser::Error::custom)?;
50 duration.as_secs().serialize(serializer)
51 }
52
53 pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
54 where
55 D: Deserializer<'de>,
56 {
57 let secs = u64::deserialize(deserializer)?;
58 Ok(UNIX_EPOCH + Duration::from_secs(secs))
59 }
60}
61
62impl QueueEntry {
63 pub fn new(mail: Mail) -> Self {
65 Self::new_with_priority(mail, Priority::default())
66 }
67
68 pub fn new_with_priority(mail: Mail, priority: Priority) -> Self {
70 Self {
71 mail,
72 attempts: 0,
73 max_attempts: 5,
74 next_retry: SystemTime::now(),
75 last_error: None,
76 priority,
77 }
78 }
79
80 pub fn priority(&self) -> Priority {
82 self.priority
83 }
84
85 pub fn set_priority(&mut self, priority: Priority) {
87 self.priority = priority;
88 }
89
90 pub fn calculate_next_retry(&mut self) {
92 let backoff_secs = 2u64.pow(self.attempts.min(10)) * 60; self.next_retry = SystemTime::now() + Duration::from_secs(backoff_secs);
94 self.attempts += 1;
95 }
96
97 pub fn should_retry(&self) -> bool {
99 self.attempts < self.max_attempts && SystemTime::now() >= self.next_retry
100 }
101
102 pub fn is_bounced(&self) -> bool {
104 self.attempts >= self.max_attempts
105 }
106
107 pub fn mail_id(&self) -> &MailId {
109 self.mail.id()
110 }
111
112 pub fn to_data(&self) -> QueueEntryData {
114 QueueEntryData {
115 mail_id: *self.mail.id(),
116 attempts: self.attempts,
117 max_attempts: self.max_attempts,
118 next_retry: self.next_retry,
119 last_error: self.last_error.clone(),
120 priority: self.priority,
121 }
122 }
123
124 pub fn from_data(data: QueueEntryData, mail: Mail) -> Self {
126 Self {
127 mail,
128 attempts: data.attempts,
129 max_attempts: data.max_attempts,
130 next_retry: data.next_retry,
131 last_error: data.last_error,
132 priority: data.priority,
133 }
134 }
135}
136
137#[async_trait::async_trait]
139pub trait QueueStore: Send + Sync {
140 async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()>;
142
143 async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>>;
145
146 async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()>;
148
149 async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>>;
151
152 async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()>;
154
155 async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>>;
157
158 async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()>;
160}
161
162pub struct MailQueue {
164 entries: Arc<RwLock<HashMap<MailId, QueueEntry>>>,
165 store: Option<Arc<dyn QueueStore>>,
166 priority_config: Arc<RwLock<crate::queue::priority::PriorityConfig>>,
167 domain_stats: Arc<DashMap<String, AtomicU64>>,
169}
170
171impl MailQueue {
172 pub fn new() -> Self {
174 Self {
175 entries: Arc::new(RwLock::new(HashMap::new())),
176 store: None,
177 priority_config: Arc::new(RwLock::new(
178 crate::queue::priority::PriorityConfig::default(),
179 )),
180 domain_stats: Arc::new(DashMap::new()),
181 }
182 }
183
184 pub fn new_with_store(store: Arc<dyn QueueStore>) -> Self {
186 Self {
187 entries: Arc::new(RwLock::new(HashMap::new())),
188 store: Some(store),
189 priority_config: Arc::new(RwLock::new(
190 crate::queue::priority::PriorityConfig::default(),
191 )),
192 domain_stats: Arc::new(DashMap::new()),
193 }
194 }
195
196 pub fn new_with_priority_config(
198 priority_config: crate::queue::priority::PriorityConfig,
199 ) -> Self {
200 Self {
201 entries: Arc::new(RwLock::new(HashMap::new())),
202 store: None,
203 priority_config: Arc::new(RwLock::new(priority_config)),
204 domain_stats: Arc::new(DashMap::new()),
205 }
206 }
207
208 pub fn new_with_store_and_priority(
210 store: Arc<dyn QueueStore>,
211 priority_config: crate::queue::priority::PriorityConfig,
212 ) -> Self {
213 Self {
214 entries: Arc::new(RwLock::new(HashMap::new())),
215 store: Some(store),
216 priority_config: Arc::new(RwLock::new(priority_config)),
217 domain_stats: Arc::new(DashMap::new()),
218 }
219 }
220
221 pub fn update_priority_config(&self, config: crate::queue::priority::PriorityConfig) {
223 if let Ok(mut guard) = self.priority_config.write() {
224 *guard = config;
225 }
226 }
227
228 pub fn get_priority_config(&self) -> crate::queue::priority::PriorityConfig {
230 self.priority_config
231 .read()
232 .map(|g| g.clone())
233 .unwrap_or_default()
234 }
235
236 pub fn domain_stats_map(&self) -> Arc<DashMap<String, AtomicU64>> {
238 Arc::clone(&self.domain_stats)
239 }
240
241 pub fn queue_stats_per_domain(&self) -> HashMap<String, u64> {
245 self.domain_stats
246 .iter()
247 .map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
248 .collect()
249 }
250
251 fn record_domain_stats(&self, mail: &Mail) {
253 for recipient in mail.recipients() {
254 let domain = recipient.domain().as_str().to_owned();
255 if let Some(counter) = self.domain_stats.get(&domain) {
256 counter.fetch_add(1, Ordering::Relaxed);
257 } else {
258 self.domain_stats
260 .entry(domain)
261 .or_insert_with(|| AtomicU64::new(0))
262 .fetch_add(1, Ordering::Relaxed);
263 }
264 }
265 }
266
267 pub async fn load_from_storage(&self) -> anyhow::Result<()> {
269 if let Some(store) = &self.store {
270 let entries = store.load_all_entries().await?;
271 let mut queue_entries = self
272 .entries
273 .write()
274 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
275 for entry in entries {
276 let mail_id = *entry.mail.id();
277 tracing::info!("Loaded mail {} from storage", mail_id);
278 queue_entries.insert(mail_id, entry);
279 }
280 }
281 Ok(())
282 }
283
284 pub async fn enqueue(&self, mail: Mail) -> anyhow::Result<()> {
286 let mail_id = *mail.id();
287
288 let priority = {
290 let config = self
291 .priority_config
292 .read()
293 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
294 config.calculate_priority(&mail, 0)
295 };
296
297 self.record_domain_stats(&mail);
299
300 let entry = QueueEntry::new_with_priority(mail, priority);
301
302 if let Some(store) = &self.store {
304 store.save_entry(&entry).await?;
305 }
306
307 self.entries
309 .write()
310 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
311 .insert(mail_id, entry);
312 tracing::info!(
313 "Enqueued mail {} for delivery with priority {}",
314 mail_id,
315 priority
316 );
317 Ok(())
318 }
319
320 pub async fn enqueue_with_priority(
322 &self,
323 mail: Mail,
324 priority: Priority,
325 ) -> anyhow::Result<()> {
326 let mail_id = *mail.id();
327
328 self.record_domain_stats(&mail);
330
331 let entry = QueueEntry::new_with_priority(mail, priority);
332
333 if let Some(store) = &self.store {
335 store.save_entry(&entry).await?;
336 }
337
338 self.entries
340 .write()
341 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
342 .insert(mail_id, entry);
343 tracing::info!(
344 "Enqueued mail {} for delivery with priority {}",
345 mail_id,
346 priority
347 );
348 Ok(())
349 }
350
351 pub fn get_ready_for_retry(&self, limit: usize) -> Vec<QueueEntry> {
353 let entries = match self.entries.read() {
354 Ok(guard) => guard,
355 Err(poisoned) => poisoned.into_inner(),
356 };
357 let mut ready: Vec<QueueEntry> = entries
358 .values()
359 .filter(|e| e.should_retry())
360 .cloned()
361 .collect();
362
363 ready.sort_by(|a, b| match b.priority.cmp(&a.priority) {
365 std::cmp::Ordering::Equal => a.next_retry.cmp(&b.next_retry),
366 other => other,
367 });
368
369 ready.into_iter().take(limit).collect()
370 }
371
372 pub fn get_ready_for_retry_by_priority(
374 &self,
375 priority: Priority,
376 limit: usize,
377 ) -> Vec<QueueEntry> {
378 let entries = match self.entries.read() {
379 Ok(guard) => guard,
380 Err(poisoned) => poisoned.into_inner(),
381 };
382 entries
383 .values()
384 .filter(|e| e.should_retry() && e.priority == priority)
385 .take(limit)
386 .cloned()
387 .collect()
388 }
389
390 pub async fn mark_failed(&self, mail_id: &MailId, error: String) -> anyhow::Result<()> {
392 let (should_move_to_dlq, entry_to_save) = {
393 let mut entries = self
394 .entries
395 .write()
396 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
397 if let Some(entry) = entries.get_mut(mail_id) {
398 entry.last_error = Some(error.clone());
399 entry.calculate_next_retry();
400
401 let new_priority = {
403 let config = self
404 .priority_config
405 .read()
406 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
407 if config.inherit_priority_on_retry {
408 config.calculate_priority(&entry.mail, entry.attempts)
409 } else {
410 entry.priority
411 }
412 };
413
414 if new_priority != entry.priority {
415 tracing::info!(
416 "Mail {} priority boosted from {} to {} after {} attempts",
417 mail_id,
418 entry.priority,
419 new_priority,
420 entry.attempts
421 );
422 entry.priority = new_priority;
423 }
424
425 if entry.is_bounced() {
426 tracing::warn!(
427 "Mail {} exceeded max delivery attempts ({}), moving to DLQ",
428 mail_id,
429 entry.max_attempts
430 );
431 (true, None)
432 } else {
433 tracing::info!(
434 "Mail {} delivery failed (attempt {}/{}), priority {}, retry at {:?}",
435 mail_id,
436 entry.attempts,
437 entry.max_attempts,
438 entry.priority,
439 entry.next_retry
440 );
441 (false, Some(entry.clone()))
442 }
443 } else {
444 (false, None)
445 }
446 }; if let Some(entry) = entry_to_save {
450 if let Some(store) = &self.store {
451 store.save_entry(&entry).await?;
452 }
453 }
454
455 if should_move_to_dlq {
457 self.move_to_dlq(mail_id).await?;
458 }
459
460 Ok(())
461 }
462
463 async fn move_to_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
465 let entry = self
466 .entries
467 .write()
468 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
469 .remove(mail_id);
470
471 if let Some(entry) = entry {
472 if let Some(store) = &self.store {
473 store.save_to_dlq(&entry).await?;
475 store.remove_entry(mail_id).await?;
477 tracing::info!("Moved mail {} to dead letter queue", mail_id);
478 } else {
479 tracing::warn!("Mail {} bounced but no DLQ storage available", mail_id);
481 }
482 }
483 Ok(())
484 }
485
486 pub async fn mark_delivered(&self, mail_id: &MailId) -> anyhow::Result<()> {
488 if let Some(store) = &self.store {
490 store.remove_entry(mail_id).await?;
491 }
492
493 self.entries
495 .write()
496 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
497 .remove(mail_id);
498 tracing::info!(
499 "Mail {} successfully delivered and removed from queue",
500 mail_id
501 );
502 Ok(())
503 }
504
505 pub fn get_bounced(&self) -> Vec<QueueEntry> {
507 let entries = match self.entries.read() {
508 Ok(guard) => guard,
509 Err(poisoned) => poisoned.into_inner(),
510 };
511 entries
512 .values()
513 .filter(|e| e.is_bounced())
514 .cloned()
515 .collect()
516 }
517
518 pub async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
520 if let Some(store) = &self.store {
521 store.list_dlq().await
522 } else {
523 Ok(Vec::new())
524 }
525 }
526
527 pub async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
529 if let Some(store) = &self.store {
530 store.remove_from_dlq(mail_id).await?;
531 }
532 Ok(())
533 }
534
535 pub async fn retry_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
537 if let Some(store) = &self.store {
538 let dlq_entries = store.list_dlq().await?;
540 if let Some(mut entry) = dlq_entries.into_iter().find(|e| e.mail.id() == mail_id) {
541 entry.attempts = 0;
543 entry.next_retry = SystemTime::now();
544 entry.last_error = None;
545
546 store.save_entry(&entry).await?;
548 self.entries
549 .write()
550 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
551 .insert(*mail_id, entry);
552
553 store.remove_from_dlq(mail_id).await?;
555
556 tracing::info!("Retrying mail {} from dead letter queue", mail_id);
557 }
558 }
559 Ok(())
560 }
561
562 pub async fn remove(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
564 if let Some(store) = &self.store {
565 store.remove_entry(mail_id).await?;
566 }
567 Ok(self
568 .entries
569 .write()
570 .map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
571 .remove(mail_id))
572 }
573
574 pub fn stats(&self) -> QueueStats {
576 let entries = match self.entries.read() {
577 Ok(guard) => guard,
578 Err(poisoned) => poisoned.into_inner(),
579 };
580 let total = entries.len();
581 let ready = entries.values().filter(|e| e.should_retry()).count();
582 let bounced = entries.values().filter(|e| e.is_bounced()).count();
583
584 QueueStats {
585 total,
586 ready,
587 bounced,
588 delayed: total - ready - bounced,
589 }
590 }
591
592 pub fn stats_for_priority(&self, priority: Priority) -> QueueStats {
594 let entries = match self.entries.read() {
595 Ok(guard) => guard,
596 Err(poisoned) => poisoned.into_inner(),
597 };
598 let priority_entries: Vec<_> = entries
599 .values()
600 .filter(|e| e.priority == priority)
601 .collect();
602
603 let total = priority_entries.len();
604 let ready = priority_entries.iter().filter(|e| e.should_retry()).count();
605 let bounced = priority_entries.iter().filter(|e| e.is_bounced()).count();
606
607 QueueStats {
608 total,
609 ready,
610 bounced,
611 delayed: total - ready - bounced,
612 }
613 }
614
615 pub fn stats_by_priority(&self) -> HashMap<Priority, QueueStats> {
617 let mut stats_map = HashMap::new();
618
619 for &priority in Priority::all() {
620 stats_map.insert(priority, self.stats_for_priority(priority));
621 }
622
623 stats_map
624 }
625
626 pub fn list_all(&self) -> Vec<QueueEntry> {
628 match self.entries.read() {
629 Ok(guard) => guard.values().cloned().collect(),
630 Err(poisoned) => poisoned.into_inner().values().cloned().collect(),
631 }
632 }
633
634 pub fn list_by_priority(&self, priority: Priority) -> Vec<QueueEntry> {
636 match self.entries.read() {
637 Ok(guard) => guard
638 .values()
639 .filter(|e| e.priority == priority)
640 .cloned()
641 .collect(),
642 Err(poisoned) => poisoned
643 .into_inner()
644 .values()
645 .filter(|e| e.priority == priority)
646 .cloned()
647 .collect(),
648 }
649 }
650}
651
652impl Default for MailQueue {
653 fn default() -> Self {
654 Self::new()
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct QueueStats {
661 pub total: usize,
662 pub ready: usize,
663 pub bounced: usize,
664 pub delayed: usize,
665}
666
667pub struct FilesystemQueueStore {
669 queue_dir: PathBuf,
670 dlq_dir: PathBuf,
671 storage: Arc<dyn StorageBackend>,
672}
673
674impl FilesystemQueueStore {
675 pub fn new(base_path: impl Into<PathBuf>, storage: Arc<dyn StorageBackend>) -> Self {
677 let base_path: PathBuf = base_path.into();
678 let queue_dir = base_path.join("queue");
679 let dlq_dir = base_path.join("dlq");
680
681 Self {
682 queue_dir,
683 dlq_dir,
684 storage,
685 }
686 }
687
688 async fn ensure_dirs(&self) -> anyhow::Result<()> {
690 tokio::fs::create_dir_all(&self.queue_dir).await?;
691 tokio::fs::create_dir_all(&self.dlq_dir).await?;
692 Ok(())
693 }
694
695 fn entry_metadata_path(&self, mail_id: &MailId) -> PathBuf {
697 self.queue_dir.join(format!("{}.json", mail_id))
698 }
699
700 fn dlq_metadata_path(&self, mail_id: &MailId) -> PathBuf {
702 self.dlq_dir.join(format!("{}.json", mail_id))
703 }
704}
705
706#[async_trait::async_trait]
707impl QueueStore for FilesystemQueueStore {
708 async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()> {
709 self.ensure_dirs().await?;
710
711 let data = entry.to_data();
713 let json = serde_json::to_string_pretty(&data)?;
714 let metadata_path = self.entry_metadata_path(entry.mail_id());
715 tokio::fs::write(&metadata_path, json).await?;
716
717 let message_store = self.storage.message_store();
719 let mailbox_id = rusmes_storage::MailboxId::new(); message_store
721 .append_message(&mailbox_id, entry.mail.clone())
722 .await?;
723
724 Ok(())
725 }
726
727 async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
728 let metadata_path = self.entry_metadata_path(mail_id);
729
730 if !tokio::fs::try_exists(&metadata_path).await? {
732 return Ok(None);
733 }
734
735 let json = tokio::fs::read_to_string(&metadata_path).await?;
737 let data: QueueEntryData = serde_json::from_str(&json)?;
738
739 let message_store = self.storage.message_store();
741 let mail_msg_id = rusmes_proto::MessageId::new(); if let Some(mail) = message_store.get_message(&mail_msg_id).await? {
743 Ok(Some(QueueEntry::from_data(data, mail)))
744 } else {
745 Ok(None)
746 }
747 }
748
749 async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()> {
750 let metadata_path = self.entry_metadata_path(mail_id);
751
752 if tokio::fs::try_exists(&metadata_path).await? {
754 tokio::fs::remove_file(&metadata_path).await?;
755 }
756
757 Ok(())
760 }
761
762 async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>> {
763 self.ensure_dirs().await?;
764
765 let mut entries = Vec::new();
766 let mut read_dir = tokio::fs::read_dir(&self.queue_dir).await?;
767
768 while let Some(entry) = read_dir.next_entry().await? {
769 let path = entry.path();
770 if path.extension().and_then(|s| s.to_str()) == Some("json") {
771 let json = tokio::fs::read_to_string(&path).await?;
773 if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
774 let message_store = self.storage.message_store();
776 let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
778 entries.push(QueueEntry::from_data(data, mail));
779 }
780 }
781 }
782 }
783
784 Ok(entries)
785 }
786
787 async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()> {
788 self.ensure_dirs().await?;
789
790 let data = entry.to_data();
792 let json = serde_json::to_string_pretty(&data)?;
793 let metadata_path = self.dlq_metadata_path(entry.mail_id());
794 tokio::fs::write(&metadata_path, json).await?;
795
796 Ok(())
799 }
800
801 async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
802 self.ensure_dirs().await?;
803
804 let mut entries = Vec::new();
805 let mut read_dir = tokio::fs::read_dir(&self.dlq_dir).await?;
806
807 while let Some(entry) = read_dir.next_entry().await? {
808 let path = entry.path();
809 if path.extension().and_then(|s| s.to_str()) == Some("json") {
810 let json = tokio::fs::read_to_string(&path).await?;
812 if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
813 let message_store = self.storage.message_store();
815 let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
817 entries.push(QueueEntry::from_data(data, mail));
818 }
819 }
820 }
821 }
822
823 Ok(entries)
824 }
825
826 async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
827 let metadata_path = self.dlq_metadata_path(mail_id);
828
829 if tokio::fs::try_exists(&metadata_path).await? {
831 tokio::fs::remove_file(&metadata_path).await?;
832 }
833
834 Ok(())
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use bytes::Bytes;
842 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
843
844 fn make_mail(sender: Option<&str>, recipients: Vec<&str>) -> Mail {
845 let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
846 Mail::new(
847 sender.and_then(|s| s.parse().ok()),
848 recipients.iter().filter_map(|r| r.parse().ok()).collect(),
849 message,
850 None,
851 None,
852 )
853 }
854
855 #[tokio::test]
856 async fn test_queue_enqueue_dequeue() {
857 let queue = MailQueue::new();
858 let mail = make_mail(Some("sender@example.com"), vec!["recipient@example.com"]);
859
860 let mail_id = *mail.id();
861 queue.enqueue(mail).await.unwrap();
862
863 let stats = queue.stats();
864 assert_eq!(stats.total, 1);
865 assert_eq!(stats.ready, 1);
866
867 queue.mark_delivered(&mail_id).await.unwrap();
868 let stats = queue.stats();
869 assert_eq!(stats.total, 0);
870 }
871
872 #[test]
873 fn test_retry_backoff() {
874 let mut entry = QueueEntry::new(make_mail(None, vec![]));
875
876 entry.calculate_next_retry();
877 assert_eq!(entry.attempts, 1);
878
879 entry.calculate_next_retry();
880 assert_eq!(entry.attempts, 2);
881
882 entry.attempts = 5;
884 assert!(entry.is_bounced());
885 }
886
887 #[tokio::test]
888 async fn queue_priority_ordering() {
889 use crate::queue::priority::{Priority, PriorityQueue};
891 use rusmes_proto::MailId;
892
893 let mut queue = PriorityQueue::<&str>::with_default_config();
894
895 queue.enqueue(MailId::new(), "High msg", Priority::High);
896 queue.enqueue(MailId::new(), "Low msg 1", Priority::Low);
897 queue.enqueue(MailId::new(), "Low msg 2", Priority::Low);
898 queue.enqueue(MailId::new(), "Normal msg", Priority::Normal);
899
900 let (_, item1, p1) = queue.dequeue().unwrap();
901 assert_eq!(p1, Priority::High, "First dequeued should be High");
902 assert_eq!(item1, "High msg");
903
904 let (_, item2, p2) = queue.dequeue().unwrap();
905 assert_eq!(p2, Priority::Normal, "Second dequeued should be Normal");
906 assert_eq!(item2, "Normal msg");
907
908 let (_, _, p3) = queue.dequeue().unwrap();
909 assert_eq!(p3, Priority::Low, "Third dequeued should be Low");
910
911 let (_, _, p4) = queue.dequeue().unwrap();
912 assert_eq!(p4, Priority::Low, "Fourth dequeued should be Low");
913
914 assert!(queue.is_empty());
915 }
916
917 #[tokio::test]
918 async fn queue_stats_per_domain_counts() {
919 let queue = MailQueue::new();
920
921 for _ in 0..5 {
923 let mail = make_mail(Some("sender@x.com"), vec!["a@example.com"]);
924 queue.enqueue(mail).await.unwrap();
925 }
926
927 for _ in 0..3 {
929 let mail = make_mail(Some("sender@x.com"), vec!["b@example.org"]);
930 queue.enqueue(mail).await.unwrap();
931 }
932
933 let stats = queue.queue_stats_per_domain();
934 assert_eq!(
935 stats.get("example.com").copied().unwrap_or(0),
936 5,
937 "example.com should have 5 messages"
938 );
939 assert_eq!(
940 stats.get("example.org").copied().unwrap_or(0),
941 3,
942 "example.org should have 3 messages"
943 );
944 }
945}