use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{Email, MailError, Mailer, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub id: String,
pub email: Email,
pub attempts: u32,
pub max_retries: u32,
pub created_at: i64,
pub next_retry_at: Option<i64>,
pub last_error: Option<String>,
pub priority: u8,
#[serde(default)]
pub metadata: std::collections::HashMap<String, String>,
}
impl EmailJob {
pub fn new(email: Email) -> Self {
Self {
id: Uuid::new_v4().to_string(),
email,
attempts: 0,
max_retries: 3,
created_at: chrono_now_ms(),
next_retry_at: None,
last_error: None,
priority: 5,
metadata: std::collections::HashMap::new(),
}
}
pub fn max_retries(mut self, retries: u32) -> Self {
self.max_retries = retries;
self
}
pub fn priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn should_retry(&self) -> bool {
self.attempts < self.max_retries
}
pub fn prepare_retry(&mut self, delay: Duration) {
self.attempts += 1;
self.next_retry_at = Some(chrono_now_ms() + delay.as_millis() as i64);
}
}
fn chrono_now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
#[derive(Debug, Clone)]
pub struct EmailQueueConfig {
pub queue_name: String,
pub concurrency: usize,
pub batch_size: usize,
pub poll_interval: Duration,
pub retry_delay: Duration,
pub max_retry_delay: Duration,
pub dead_letter_queue: bool,
pub job_timeout: Duration,
}
impl Default for EmailQueueConfig {
fn default() -> Self {
Self {
queue_name: "armature:email:queue".to_string(),
concurrency: 4,
batch_size: 10,
poll_interval: Duration::from_secs(1),
retry_delay: Duration::from_secs(5),
max_retry_delay: Duration::from_secs(300),
dead_letter_queue: true,
job_timeout: Duration::from_secs(60),
}
}
}
impl EmailQueueConfig {
pub fn queue_name(mut self, name: impl Into<String>) -> Self {
self.queue_name = name.into();
self
}
pub fn concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn retry_delay(mut self, delay: Duration) -> Self {
self.retry_delay = delay;
self
}
pub fn dead_letter_queue(mut self, enabled: bool) -> Self {
self.dead_letter_queue = enabled;
self
}
}
#[async_trait::async_trait]
pub trait EmailQueueBackend: Send + Sync {
async fn push(&self, job: EmailJob) -> Result<()>;
async fn pop(&self, count: usize) -> Result<Vec<EmailJob>>;
async fn complete(&self, job_id: &str) -> Result<()>;
async fn fail(&self, job: EmailJob, error: &str) -> Result<()>;
async fn dead_letter(&self, job: EmailJob) -> Result<()>;
async fn stats(&self) -> Result<QueueStats>;
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueueStats {
pub pending: u64,
pub processing: u64,
pub retrying: u64,
pub dead_letter: u64,
pub processed: u64,
}
pub struct InMemoryBackend {
queue: tokio::sync::Mutex<std::collections::VecDeque<EmailJob>>,
dead_letter: tokio::sync::Mutex<Vec<EmailJob>>,
processed: std::sync::atomic::AtomicU64,
}
impl InMemoryBackend {
pub fn new() -> Self {
Self {
queue: tokio::sync::Mutex::new(std::collections::VecDeque::new()),
dead_letter: tokio::sync::Mutex::new(Vec::new()),
processed: std::sync::atomic::AtomicU64::new(0),
}
}
}
impl Default for InMemoryBackend {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl EmailQueueBackend for InMemoryBackend {
async fn push(&self, job: EmailJob) -> Result<()> {
let mut queue = self.queue.lock().await;
queue.push_back(job);
Ok(())
}
async fn pop(&self, count: usize) -> Result<Vec<EmailJob>> {
let mut queue = self.queue.lock().await;
let now = chrono_now_ms();
let mut jobs = Vec::with_capacity(count);
let mut i = 0;
while i < queue.len() && jobs.len() < count {
if let Some(next_retry) = queue[i].next_retry_at {
if next_retry > now {
i += 1;
continue;
}
}
if let Some(job) = queue.remove(i) {
jobs.push(job);
}
}
Ok(jobs)
}
async fn complete(&self, _job_id: &str) -> Result<()> {
self.processed
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
async fn fail(&self, mut job: EmailJob, error: &str) -> Result<()> {
job.last_error = Some(error.to_string());
let mut queue = self.queue.lock().await;
queue.push_back(job);
Ok(())
}
async fn dead_letter(&self, job: EmailJob) -> Result<()> {
let mut dl = self.dead_letter.lock().await;
dl.push(job);
Ok(())
}
async fn stats(&self) -> Result<QueueStats> {
let queue = self.queue.lock().await;
let dl = self.dead_letter.lock().await;
let now = chrono_now_ms();
let (pending, retrying) = queue.iter().fold((0, 0), |(p, r), job| {
if let Some(next_retry) = job.next_retry_at {
if next_retry > now {
return (p, r + 1);
}
}
(p + 1, r)
});
Ok(QueueStats {
pending,
processing: 0,
retrying,
dead_letter: dl.len() as u64,
processed: self.processed.load(std::sync::atomic::Ordering::Relaxed),
})
}
}
#[cfg(feature = "redis")]
pub struct RedisBackend {
redis: Arc<armature_redis::RedisService>,
config: EmailQueueConfig,
}
#[cfg(feature = "redis")]
impl RedisBackend {
pub fn new(redis: Arc<armature_redis::RedisService>, config: EmailQueueConfig) -> Self {
Self { redis, config }
}
fn pending_key(&self) -> String {
format!("{}:pending", self.config.queue_name)
}
fn retry_key(&self) -> String {
format!("{}:retry", self.config.queue_name)
}
fn dead_letter_key(&self) -> String {
format!("{}:dead", self.config.queue_name)
}
fn job_key(&self, id: &str) -> String {
format!("{}:job:{}", self.config.queue_name, id)
}
fn stats_key(&self) -> String {
format!("{}:stats", self.config.queue_name)
}
}
#[cfg(feature = "redis")]
#[async_trait::async_trait]
impl EmailQueueBackend for RedisBackend {
async fn push(&self, job: EmailJob) -> Result<()> {
let job_json = serde_json::to_string(&job)?;
let score = job.priority as f64 * 1_000_000_000.0 + job.created_at as f64;
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("SET")
.arg(&self.job_key(&job.id))
.arg(&job_json)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("ZADD")
.arg(&self.pending_key())
.arg(score)
.arg(&job.id)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
debug!(job_id = %job.id, "Email job enqueued");
Ok(())
}
async fn pop(&self, count: usize) -> Result<Vec<EmailJob>> {
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
let now = chrono_now_ms() as f64;
let job_ids: Vec<String> = redis::cmd("ZPOPMIN")
.arg(&self.pending_key())
.arg(count)
.query_async(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
let retry_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
.arg(&self.retry_key())
.arg(0.0)
.arg(now)
.arg("LIMIT")
.arg(0)
.arg(count)
.query_async(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
if !retry_ids.is_empty() {
redis::cmd("ZREM")
.arg(&self.retry_key())
.arg(&retry_ids)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
}
let mut jobs = Vec::new();
for id in job_ids.into_iter().chain(retry_ids.into_iter()) {
let job_json: Option<String> = redis::cmd("GET")
.arg(&self.job_key(&id))
.query_async(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
if let Some(json) = job_json {
match serde_json::from_str(&json) {
Ok(job) => jobs.push(job),
Err(e) => error!(job_id = %id, error = %e, "Failed to deserialize job"),
}
}
}
Ok(jobs)
}
async fn complete(&self, job_id: &str) -> Result<()> {
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("DEL")
.arg(&self.job_key(job_id))
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("HINCRBY")
.arg(&self.stats_key())
.arg("processed")
.arg(1)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
debug!(job_id = %job_id, "Email job completed");
Ok(())
}
async fn fail(&self, mut job: EmailJob, error: &str) -> Result<()> {
job.last_error = Some(error.to_string());
let job_json = serde_json::to_string(&job)?;
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("SET")
.arg(&self.job_key(&job.id))
.arg(&job_json)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
let score = job.next_retry_at.unwrap_or_else(chrono_now_ms) as f64;
redis::cmd("ZADD")
.arg(&self.retry_key())
.arg(score)
.arg(&job.id)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
debug!(job_id = %job.id, attempts = job.attempts, "Email job scheduled for retry");
Ok(())
}
async fn dead_letter(&self, job: EmailJob) -> Result<()> {
let job_json = serde_json::to_string(&job)?;
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("LPUSH")
.arg(&self.dead_letter_key())
.arg(&job_json)
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
redis::cmd("DEL")
.arg(&self.job_key(&job.id))
.query_async::<()>(&mut *conn)
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
warn!(job_id = %job.id, "Email job moved to dead letter queue");
Ok(())
}
async fn stats(&self) -> Result<QueueStats> {
let mut conn = self
.redis
.get()
.await
.map_err(|e| MailError::Queue(e.to_string()))?;
let pending: u64 = redis::cmd("ZCARD")
.arg(&self.pending_key())
.query_async(&mut *conn)
.await
.unwrap_or(0);
let retrying: u64 = redis::cmd("ZCARD")
.arg(&self.retry_key())
.query_async(&mut *conn)
.await
.unwrap_or(0);
let dead_letter: u64 = redis::cmd("LLEN")
.arg(&self.dead_letter_key())
.query_async(&mut *conn)
.await
.unwrap_or(0);
let processed: u64 = redis::cmd("HGET")
.arg(&self.stats_key())
.arg("processed")
.query_async(&mut *conn)
.await
.unwrap_or(0);
Ok(QueueStats {
pending,
processing: 0,
retrying,
dead_letter,
processed,
})
}
}
pub struct EmailQueue {
backend: Arc<dyn EmailQueueBackend>,
config: EmailQueueConfig,
}
impl EmailQueue {
pub fn in_memory(config: EmailQueueConfig) -> Self {
Self {
backend: Arc::new(InMemoryBackend::new()),
config,
}
}
#[cfg(feature = "redis")]
pub fn redis(redis: Arc<armature_redis::RedisService>, config: EmailQueueConfig) -> Self {
Self {
backend: Arc::new(RedisBackend::new(redis, config.clone())),
config,
}
}
pub fn with_backend(
backend: impl EmailQueueBackend + 'static,
config: EmailQueueConfig,
) -> Self {
Self {
backend: Arc::new(backend),
config,
}
}
pub async fn enqueue(&self, email: Email) -> Result<String> {
let job = EmailJob::new(email);
let job_id = job.id.clone();
self.backend.push(job).await?;
Ok(job_id)
}
pub async fn enqueue_job(&self, job: EmailJob) -> Result<String> {
let job_id = job.id.clone();
self.backend.push(job).await?;
Ok(job_id)
}
pub async fn enqueue_batch(&self, emails: Vec<Email>) -> Result<Vec<String>> {
let mut job_ids = Vec::with_capacity(emails.len());
for email in emails {
let id = self.enqueue(email).await?;
job_ids.push(id);
}
Ok(job_ids)
}
pub async fn stats(&self) -> Result<QueueStats> {
self.backend.stats().await
}
pub fn worker(&self, mailer: Arc<Mailer>) -> EmailQueueWorker {
EmailQueueWorker {
queue: self.backend.clone(),
mailer,
config: self.config.clone(),
shutdown: None,
}
}
}
pub struct EmailQueueWorker {
queue: Arc<dyn EmailQueueBackend>,
mailer: Arc<Mailer>,
config: EmailQueueConfig,
shutdown: Option<tokio::sync::broadcast::Receiver<()>>,
}
impl EmailQueueWorker {
pub fn with_shutdown(mut self, shutdown: tokio::sync::broadcast::Receiver<()>) -> Self {
self.shutdown = Some(shutdown);
self
}
pub async fn run(mut self) {
info!(
concurrency = self.config.concurrency,
queue = %self.config.queue_name,
"Email queue worker started"
);
let (job_tx, job_rx) = async_channel::bounded::<EmailJob>(self.config.batch_size * 2);
let job_rx = Arc::new(job_rx);
let mut handles = Vec::new();
for i in 0..self.config.concurrency {
let rx = job_rx.clone();
let queue = self.queue.clone();
let mailer = self.mailer.clone();
let config = self.config.clone();
handles.push(tokio::spawn(async move {
Self::process_jobs(i, rx, queue, mailer, config).await;
}));
}
loop {
if let Some(ref mut shutdown) = self.shutdown {
if shutdown.try_recv().is_ok() {
info!("Email queue worker shutting down");
break;
}
}
match self.queue.pop(self.config.batch_size).await {
Ok(jobs) => {
if jobs.is_empty() {
tokio::time::sleep(self.config.poll_interval).await;
} else {
for job in jobs {
if job_tx.send(job).await.is_err() {
break;
}
}
}
}
Err(e) => {
error!(error = %e, "Failed to fetch jobs from queue");
tokio::time::sleep(self.config.poll_interval).await;
}
}
}
drop(job_tx);
for handle in handles {
let _ = handle.await;
}
info!("Email queue worker stopped");
}
async fn process_jobs(
worker_id: usize,
rx: Arc<async_channel::Receiver<EmailJob>>,
queue: Arc<dyn EmailQueueBackend>,
mailer: Arc<Mailer>,
config: EmailQueueConfig,
) {
while let Ok(mut job) = rx.recv().await {
debug!(worker = worker_id, job_id = %job.id, "Processing email job");
match mailer.send(job.email.clone()).await {
Ok(()) => {
if let Err(e) = queue.complete(&job.id).await {
error!(job_id = %job.id, error = %e, "Failed to mark job complete");
}
}
Err(e) => {
let error_msg = e.to_string();
if job.should_retry() && e.is_retryable() {
let delay = Self::calculate_backoff(&config, job.attempts);
job.prepare_retry(delay);
if let Err(err) = queue.fail(job, &error_msg).await {
error!(error = %err, "Failed to schedule job retry");
}
} else if config.dead_letter_queue {
job.last_error = Some(error_msg);
if let Err(err) = queue.dead_letter(job).await {
error!(error = %err, "Failed to move job to dead letter queue");
}
}
}
}
}
}
fn calculate_backoff(config: &EmailQueueConfig, attempts: u32) -> Duration {
let base_delay = config.retry_delay.as_secs_f64();
let delay = base_delay * 2_f64.powi(attempts as i32);
let delay = delay.min(config.max_retry_delay.as_secs_f64());
Duration::from_secs_f64(delay)
}
}
pub trait MailerQueueExt {
fn queue(&self, config: EmailQueueConfig) -> EmailQueue;
#[cfg(feature = "redis")]
fn queue_redis(
&self,
redis: Arc<armature_redis::RedisService>,
config: EmailQueueConfig,
) -> EmailQueue;
}
impl MailerQueueExt for Mailer {
fn queue(&self, config: EmailQueueConfig) -> EmailQueue {
EmailQueue::in_memory(config)
}
#[cfg(feature = "redis")]
fn queue_redis(
&self,
redis: Arc<armature_redis::RedisService>,
config: EmailQueueConfig,
) -> EmailQueue {
EmailQueue::redis(redis, config)
}
}
#[allow(dead_code)]
mod async_channel {
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
pub struct Sender<T> {
tx: mpsc::Sender<T>,
}
pub struct Receiver<T> {
rx: Arc<Mutex<mpsc::Receiver<T>>>,
}
pub fn bounded<T>(size: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = mpsc::channel(size);
(
Sender { tx },
Receiver {
rx: Arc::new(Mutex::new(rx)),
},
)
}
impl<T> Sender<T> {
pub async fn send(&self, value: T) -> Result<(), ()> {
self.tx.send(value).await.map_err(|_| ())
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
rx: self.rx.clone(),
}
}
}
impl<T> Receiver<T> {
pub async fn recv(&self) -> Result<T, ()> {
self.rx.lock().await.recv().await.ok_or(())
}
}
}