use crate::queue::priority::Priority;
use dashmap::DashMap;
use rusmes_proto::{Mail, MailId};
use rusmes_storage::StorageBackend;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEntryData {
pub mail_id: MailId,
pub attempts: u32,
pub max_attempts: u32,
#[serde(with = "systemtime_serde")]
pub next_retry: SystemTime,
pub last_error: Option<String>,
#[serde(default)]
pub priority: Priority,
}
#[derive(Debug, Clone)]
pub struct QueueEntry {
pub mail: Mail,
pub attempts: u32,
pub max_attempts: u32,
pub next_retry: SystemTime,
pub last_error: Option<String>,
pub priority: Priority,
}
mod systemtime_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let duration = time
.duration_since(UNIX_EPOCH)
.map_err(serde::ser::Error::custom)?;
duration.as_secs().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
Ok(UNIX_EPOCH + Duration::from_secs(secs))
}
}
impl QueueEntry {
pub fn new(mail: Mail) -> Self {
Self::new_with_priority(mail, Priority::default())
}
pub fn new_with_priority(mail: Mail, priority: Priority) -> Self {
Self {
mail,
attempts: 0,
max_attempts: 5,
next_retry: SystemTime::now(),
last_error: None,
priority,
}
}
pub fn priority(&self) -> Priority {
self.priority
}
pub fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
}
pub fn calculate_next_retry(&mut self) {
let backoff_secs = 2u64.pow(self.attempts.min(10)) * 60; self.next_retry = SystemTime::now() + Duration::from_secs(backoff_secs);
self.attempts += 1;
}
pub fn should_retry(&self) -> bool {
self.attempts < self.max_attempts && SystemTime::now() >= self.next_retry
}
pub fn is_bounced(&self) -> bool {
self.attempts >= self.max_attempts
}
pub fn mail_id(&self) -> &MailId {
self.mail.id()
}
pub fn to_data(&self) -> QueueEntryData {
QueueEntryData {
mail_id: *self.mail.id(),
attempts: self.attempts,
max_attempts: self.max_attempts,
next_retry: self.next_retry,
last_error: self.last_error.clone(),
priority: self.priority,
}
}
pub fn from_data(data: QueueEntryData, mail: Mail) -> Self {
Self {
mail,
attempts: data.attempts,
max_attempts: data.max_attempts,
next_retry: data.next_retry,
last_error: data.last_error,
priority: data.priority,
}
}
}
#[async_trait::async_trait]
pub trait QueueStore: Send + Sync {
async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()>;
async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>>;
async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()>;
async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>>;
async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()>;
async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>>;
async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()>;
}
pub struct MailQueue {
entries: Arc<RwLock<HashMap<MailId, QueueEntry>>>,
store: Option<Arc<dyn QueueStore>>,
priority_config: Arc<RwLock<crate::queue::priority::PriorityConfig>>,
domain_stats: Arc<DashMap<String, AtomicU64>>,
}
impl MailQueue {
pub fn new() -> Self {
Self {
entries: Arc::new(RwLock::new(HashMap::new())),
store: None,
priority_config: Arc::new(RwLock::new(
crate::queue::priority::PriorityConfig::default(),
)),
domain_stats: Arc::new(DashMap::new()),
}
}
pub fn new_with_store(store: Arc<dyn QueueStore>) -> Self {
Self {
entries: Arc::new(RwLock::new(HashMap::new())),
store: Some(store),
priority_config: Arc::new(RwLock::new(
crate::queue::priority::PriorityConfig::default(),
)),
domain_stats: Arc::new(DashMap::new()),
}
}
pub fn new_with_priority_config(
priority_config: crate::queue::priority::PriorityConfig,
) -> Self {
Self {
entries: Arc::new(RwLock::new(HashMap::new())),
store: None,
priority_config: Arc::new(RwLock::new(priority_config)),
domain_stats: Arc::new(DashMap::new()),
}
}
pub fn new_with_store_and_priority(
store: Arc<dyn QueueStore>,
priority_config: crate::queue::priority::PriorityConfig,
) -> Self {
Self {
entries: Arc::new(RwLock::new(HashMap::new())),
store: Some(store),
priority_config: Arc::new(RwLock::new(priority_config)),
domain_stats: Arc::new(DashMap::new()),
}
}
pub fn update_priority_config(&self, config: crate::queue::priority::PriorityConfig) {
if let Ok(mut guard) = self.priority_config.write() {
*guard = config;
}
}
pub fn get_priority_config(&self) -> crate::queue::priority::PriorityConfig {
self.priority_config
.read()
.map(|g| g.clone())
.unwrap_or_default()
}
pub fn domain_stats_map(&self) -> Arc<DashMap<String, AtomicU64>> {
Arc::clone(&self.domain_stats)
}
pub fn queue_stats_per_domain(&self) -> HashMap<String, u64> {
self.domain_stats
.iter()
.map(|entry| (entry.key().clone(), entry.value().load(Ordering::Relaxed)))
.collect()
}
fn record_domain_stats(&self, mail: &Mail) {
for recipient in mail.recipients() {
let domain = recipient.domain().as_str().to_owned();
if let Some(counter) = self.domain_stats.get(&domain) {
counter.fetch_add(1, Ordering::Relaxed);
} else {
self.domain_stats
.entry(domain)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
}
}
pub async fn load_from_storage(&self) -> anyhow::Result<()> {
if let Some(store) = &self.store {
let entries = store.load_all_entries().await?;
let mut queue_entries = self
.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
for entry in entries {
let mail_id = *entry.mail.id();
tracing::info!("Loaded mail {} from storage", mail_id);
queue_entries.insert(mail_id, entry);
}
}
Ok(())
}
pub async fn enqueue(&self, mail: Mail) -> anyhow::Result<()> {
let mail_id = *mail.id();
let priority = {
let config = self
.priority_config
.read()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
config.calculate_priority(&mail, 0)
};
self.record_domain_stats(&mail);
let entry = QueueEntry::new_with_priority(mail, priority);
if let Some(store) = &self.store {
store.save_entry(&entry).await?;
}
self.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.insert(mail_id, entry);
tracing::info!(
"Enqueued mail {} for delivery with priority {}",
mail_id,
priority
);
Ok(())
}
pub async fn enqueue_with_priority(
&self,
mail: Mail,
priority: Priority,
) -> anyhow::Result<()> {
let mail_id = *mail.id();
self.record_domain_stats(&mail);
let entry = QueueEntry::new_with_priority(mail, priority);
if let Some(store) = &self.store {
store.save_entry(&entry).await?;
}
self.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.insert(mail_id, entry);
tracing::info!(
"Enqueued mail {} for delivery with priority {}",
mail_id,
priority
);
Ok(())
}
pub fn get_ready_for_retry(&self, limit: usize) -> Vec<QueueEntry> {
let entries = match self.entries.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let mut ready: Vec<QueueEntry> = entries
.values()
.filter(|e| e.should_retry())
.cloned()
.collect();
ready.sort_by(|a, b| match b.priority.cmp(&a.priority) {
std::cmp::Ordering::Equal => a.next_retry.cmp(&b.next_retry),
other => other,
});
ready.into_iter().take(limit).collect()
}
pub fn get_ready_for_retry_by_priority(
&self,
priority: Priority,
limit: usize,
) -> Vec<QueueEntry> {
let entries = match self.entries.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
entries
.values()
.filter(|e| e.should_retry() && e.priority == priority)
.take(limit)
.cloned()
.collect()
}
pub async fn mark_failed(&self, mail_id: &MailId, error: String) -> anyhow::Result<()> {
let (should_move_to_dlq, entry_to_save) = {
let mut entries = self
.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
if let Some(entry) = entries.get_mut(mail_id) {
entry.last_error = Some(error.clone());
entry.calculate_next_retry();
let new_priority = {
let config = self
.priority_config
.read()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?;
if config.inherit_priority_on_retry {
config.calculate_priority(&entry.mail, entry.attempts)
} else {
entry.priority
}
};
if new_priority != entry.priority {
tracing::info!(
"Mail {} priority boosted from {} to {} after {} attempts",
mail_id,
entry.priority,
new_priority,
entry.attempts
);
entry.priority = new_priority;
}
if entry.is_bounced() {
tracing::warn!(
"Mail {} exceeded max delivery attempts ({}), moving to DLQ",
mail_id,
entry.max_attempts
);
(true, None)
} else {
tracing::info!(
"Mail {} delivery failed (attempt {}/{}), priority {}, retry at {:?}",
mail_id,
entry.attempts,
entry.max_attempts,
entry.priority,
entry.next_retry
);
(false, Some(entry.clone()))
}
} else {
(false, None)
}
};
if let Some(entry) = entry_to_save {
if let Some(store) = &self.store {
store.save_entry(&entry).await?;
}
}
if should_move_to_dlq {
self.move_to_dlq(mail_id).await?;
}
Ok(())
}
async fn move_to_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
let entry = self
.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.remove(mail_id);
if let Some(entry) = entry {
if let Some(store) = &self.store {
store.save_to_dlq(&entry).await?;
store.remove_entry(mail_id).await?;
tracing::info!("Moved mail {} to dead letter queue", mail_id);
} else {
tracing::warn!("Mail {} bounced but no DLQ storage available", mail_id);
}
}
Ok(())
}
pub async fn mark_delivered(&self, mail_id: &MailId) -> anyhow::Result<()> {
if let Some(store) = &self.store {
store.remove_entry(mail_id).await?;
}
self.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.remove(mail_id);
tracing::info!(
"Mail {} successfully delivered and removed from queue",
mail_id
);
Ok(())
}
pub fn get_bounced(&self) -> Vec<QueueEntry> {
let entries = match self.entries.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
entries
.values()
.filter(|e| e.is_bounced())
.cloned()
.collect()
}
pub async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
if let Some(store) = &self.store {
store.list_dlq().await
} else {
Ok(Vec::new())
}
}
pub async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
if let Some(store) = &self.store {
store.remove_from_dlq(mail_id).await?;
}
Ok(())
}
pub async fn retry_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
if let Some(store) = &self.store {
let dlq_entries = store.list_dlq().await?;
if let Some(mut entry) = dlq_entries.into_iter().find(|e| e.mail.id() == mail_id) {
entry.attempts = 0;
entry.next_retry = SystemTime::now();
entry.last_error = None;
store.save_entry(&entry).await?;
self.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.insert(*mail_id, entry);
store.remove_from_dlq(mail_id).await?;
tracing::info!("Retrying mail {} from dead letter queue", mail_id);
}
}
Ok(())
}
pub async fn remove(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
if let Some(store) = &self.store {
store.remove_entry(mail_id).await?;
}
Ok(self
.entries
.write()
.map_err(|e| anyhow::anyhow!("RwLock poisoned: {}", e))?
.remove(mail_id))
}
pub fn stats(&self) -> QueueStats {
let entries = match self.entries.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let total = entries.len();
let ready = entries.values().filter(|e| e.should_retry()).count();
let bounced = entries.values().filter(|e| e.is_bounced()).count();
QueueStats {
total,
ready,
bounced,
delayed: total - ready - bounced,
}
}
pub fn stats_for_priority(&self, priority: Priority) -> QueueStats {
let entries = match self.entries.read() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
let priority_entries: Vec<_> = entries
.values()
.filter(|e| e.priority == priority)
.collect();
let total = priority_entries.len();
let ready = priority_entries.iter().filter(|e| e.should_retry()).count();
let bounced = priority_entries.iter().filter(|e| e.is_bounced()).count();
QueueStats {
total,
ready,
bounced,
delayed: total - ready - bounced,
}
}
pub fn stats_by_priority(&self) -> HashMap<Priority, QueueStats> {
let mut stats_map = HashMap::new();
for &priority in Priority::all() {
stats_map.insert(priority, self.stats_for_priority(priority));
}
stats_map
}
pub fn list_all(&self) -> Vec<QueueEntry> {
match self.entries.read() {
Ok(guard) => guard.values().cloned().collect(),
Err(poisoned) => poisoned.into_inner().values().cloned().collect(),
}
}
pub fn list_by_priority(&self, priority: Priority) -> Vec<QueueEntry> {
match self.entries.read() {
Ok(guard) => guard
.values()
.filter(|e| e.priority == priority)
.cloned()
.collect(),
Err(poisoned) => poisoned
.into_inner()
.values()
.filter(|e| e.priority == priority)
.cloned()
.collect(),
}
}
}
impl Default for MailQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub total: usize,
pub ready: usize,
pub bounced: usize,
pub delayed: usize,
}
pub struct FilesystemQueueStore {
queue_dir: PathBuf,
dlq_dir: PathBuf,
storage: Arc<dyn StorageBackend>,
}
impl FilesystemQueueStore {
pub fn new(base_path: impl Into<PathBuf>, storage: Arc<dyn StorageBackend>) -> Self {
let base_path: PathBuf = base_path.into();
let queue_dir = base_path.join("queue");
let dlq_dir = base_path.join("dlq");
Self {
queue_dir,
dlq_dir,
storage,
}
}
async fn ensure_dirs(&self) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&self.queue_dir).await?;
tokio::fs::create_dir_all(&self.dlq_dir).await?;
Ok(())
}
fn entry_metadata_path(&self, mail_id: &MailId) -> PathBuf {
self.queue_dir.join(format!("{}.json", mail_id))
}
fn dlq_metadata_path(&self, mail_id: &MailId) -> PathBuf {
self.dlq_dir.join(format!("{}.json", mail_id))
}
}
#[async_trait::async_trait]
impl QueueStore for FilesystemQueueStore {
async fn save_entry(&self, entry: &QueueEntry) -> anyhow::Result<()> {
self.ensure_dirs().await?;
let data = entry.to_data();
let json = serde_json::to_string_pretty(&data)?;
let metadata_path = self.entry_metadata_path(entry.mail_id());
tokio::fs::write(&metadata_path, json).await?;
let message_store = self.storage.message_store();
let mailbox_id = rusmes_storage::MailboxId::new(); message_store
.append_message(&mailbox_id, entry.mail.clone())
.await?;
Ok(())
}
async fn load_entry(&self, mail_id: &MailId) -> anyhow::Result<Option<QueueEntry>> {
let metadata_path = self.entry_metadata_path(mail_id);
if !tokio::fs::try_exists(&metadata_path).await? {
return Ok(None);
}
let json = tokio::fs::read_to_string(&metadata_path).await?;
let data: QueueEntryData = serde_json::from_str(&json)?;
let message_store = self.storage.message_store();
let mail_msg_id = rusmes_proto::MessageId::new(); if let Some(mail) = message_store.get_message(&mail_msg_id).await? {
Ok(Some(QueueEntry::from_data(data, mail)))
} else {
Ok(None)
}
}
async fn remove_entry(&self, mail_id: &MailId) -> anyhow::Result<()> {
let metadata_path = self.entry_metadata_path(mail_id);
if tokio::fs::try_exists(&metadata_path).await? {
tokio::fs::remove_file(&metadata_path).await?;
}
Ok(())
}
async fn load_all_entries(&self) -> anyhow::Result<Vec<QueueEntry>> {
self.ensure_dirs().await?;
let mut entries = Vec::new();
let mut read_dir = tokio::fs::read_dir(&self.queue_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let json = tokio::fs::read_to_string(&path).await?;
if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
let message_store = self.storage.message_store();
let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
entries.push(QueueEntry::from_data(data, mail));
}
}
}
}
Ok(entries)
}
async fn save_to_dlq(&self, entry: &QueueEntry) -> anyhow::Result<()> {
self.ensure_dirs().await?;
let data = entry.to_data();
let json = serde_json::to_string_pretty(&data)?;
let metadata_path = self.dlq_metadata_path(entry.mail_id());
tokio::fs::write(&metadata_path, json).await?;
Ok(())
}
async fn list_dlq(&self) -> anyhow::Result<Vec<QueueEntry>> {
self.ensure_dirs().await?;
let mut entries = Vec::new();
let mut read_dir = tokio::fs::read_dir(&self.dlq_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
let json = tokio::fs::read_to_string(&path).await?;
if let Ok(data) = serde_json::from_str::<QueueEntryData>(&json) {
let message_store = self.storage.message_store();
let mail_msg_id = rusmes_proto::MessageId::new(); if let Ok(Some(mail)) = message_store.get_message(&mail_msg_id).await {
entries.push(QueueEntry::from_data(data, mail));
}
}
}
}
Ok(entries)
}
async fn remove_from_dlq(&self, mail_id: &MailId) -> anyhow::Result<()> {
let metadata_path = self.dlq_metadata_path(mail_id);
if tokio::fs::try_exists(&metadata_path).await? {
tokio::fs::remove_file(&metadata_path).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
fn make_mail(sender: Option<&str>, recipients: Vec<&str>) -> Mail {
let message = MimeMessage::new(HeaderMap::new(), MessageBody::Small(Bytes::from("Test")));
Mail::new(
sender.and_then(|s| s.parse().ok()),
recipients.iter().filter_map(|r| r.parse().ok()).collect(),
message,
None,
None,
)
}
#[tokio::test]
async fn test_queue_enqueue_dequeue() {
let queue = MailQueue::new();
let mail = make_mail(Some("sender@example.com"), vec!["recipient@example.com"]);
let mail_id = *mail.id();
queue.enqueue(mail).await.unwrap();
let stats = queue.stats();
assert_eq!(stats.total, 1);
assert_eq!(stats.ready, 1);
queue.mark_delivered(&mail_id).await.unwrap();
let stats = queue.stats();
assert_eq!(stats.total, 0);
}
#[test]
fn test_retry_backoff() {
let mut entry = QueueEntry::new(make_mail(None, vec![]));
entry.calculate_next_retry();
assert_eq!(entry.attempts, 1);
entry.calculate_next_retry();
assert_eq!(entry.attempts, 2);
entry.attempts = 5;
assert!(entry.is_bounced());
}
#[tokio::test]
async fn queue_priority_ordering() {
use crate::queue::priority::{Priority, PriorityQueue};
use rusmes_proto::MailId;
let mut queue = PriorityQueue::<&str>::with_default_config();
queue.enqueue(MailId::new(), "High msg", Priority::High);
queue.enqueue(MailId::new(), "Low msg 1", Priority::Low);
queue.enqueue(MailId::new(), "Low msg 2", Priority::Low);
queue.enqueue(MailId::new(), "Normal msg", Priority::Normal);
let (_, item1, p1) = queue.dequeue().unwrap();
assert_eq!(p1, Priority::High, "First dequeued should be High");
assert_eq!(item1, "High msg");
let (_, item2, p2) = queue.dequeue().unwrap();
assert_eq!(p2, Priority::Normal, "Second dequeued should be Normal");
assert_eq!(item2, "Normal msg");
let (_, _, p3) = queue.dequeue().unwrap();
assert_eq!(p3, Priority::Low, "Third dequeued should be Low");
let (_, _, p4) = queue.dequeue().unwrap();
assert_eq!(p4, Priority::Low, "Fourth dequeued should be Low");
assert!(queue.is_empty());
}
#[tokio::test]
async fn queue_stats_per_domain_counts() {
let queue = MailQueue::new();
for _ in 0..5 {
let mail = make_mail(Some("sender@x.com"), vec!["a@example.com"]);
queue.enqueue(mail).await.unwrap();
}
for _ in 0..3 {
let mail = make_mail(Some("sender@x.com"), vec!["b@example.org"]);
queue.enqueue(mail).await.unwrap();
}
let stats = queue.queue_stats_per_domain();
assert_eq!(
stats.get("example.com").copied().unwrap_or(0),
5,
"example.com should have 5 messages"
);
assert_eq!(
stats.get("example.org").copied().unwrap_or(0),
3,
"example.org should have 3 messages"
);
}
}