use std::sync::{OnceLock, RwLock};
use serde::{Deserialize, Serialize};
use crate::email::{BoxedMailer, Email};
use crate::jobs::{Job, JobError, JobQueue};
fn mailer_registry() -> &'static RwLock<std::collections::HashMap<&'static str, BoxedMailer>> {
static REG: OnceLock<RwLock<std::collections::HashMap<&'static str, BoxedMailer>>> =
OnceLock::new();
REG.get_or_init(|| RwLock::new(std::collections::HashMap::new()))
}
#[derive(Clone)]
pub struct EmailJobConfig {
pub mailer: BoxedMailer,
}
impl EmailJobConfig {
#[must_use]
pub fn new(mailer: BoxedMailer) -> Self {
Self { mailer }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub email: Email,
}
#[async_trait::async_trait]
impl Job for EmailJob {
const NAME: &'static str = "rustango.send_email";
const MAX_ATTEMPTS: u32 = 5;
async fn run(&self) -> Result<(), JobError> {
let mailer = mailer_registry()
.read()
.expect("EmailJob registry poisoned")
.get(Self::NAME)
.cloned()
.ok_or_else(|| {
JobError::Queue(
"EmailJob: no mailer registered (call register_email_job at startup)"
.into(),
)
})?;
mailer
.send(&self.email)
.await
.map_err(|e| JobError::Retryable(format!("mailer: {e}")))
}
}
pub async fn register_email_job<Q: JobQueue>(queue: &Q, cfg: EmailJobConfig) {
mailer_registry()
.write()
.expect("EmailJob registry poisoned")
.insert(EmailJob::NAME, cfg.mailer);
queue.register::<EmailJob>().await;
}
pub async fn dispatch_email<Q: JobQueue>(queue: &Q, email: &Email) -> Result<(), JobError> {
queue.dispatch(&EmailJob { email: email.clone() }).await
}
#[cfg(test)]
pub fn reset_mailer_registry() {
if let Ok(mut g) = mailer_registry().write() {
g.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::email::{InMemoryMailer, NullMailer};
use crate::jobs::InMemoryJobQueue;
use std::sync::Arc as StdArc;
use std::time::Duration;
use tokio::sync::Mutex;
fn lock() -> &'static Mutex<()> {
static M: std::sync::OnceLock<Mutex<()>> = std::sync::OnceLock::new();
M.get_or_init(|| Mutex::new(()))
}
fn email() -> Email {
Email::new()
.from("noreply@x.com")
.to("alice@x.com")
.subject("Hi")
.body("hello")
}
#[tokio::test]
async fn dispatch_then_worker_sends() {
let _g = lock().lock().await;
reset_mailer_registry();
let mailer = StdArc::new(InMemoryMailer::new());
let q = InMemoryJobQueue::with_workers(1);
register_email_job(&q, EmailJobConfig::new(mailer.clone())).await;
q.start().await;
dispatch_email(&q, &email()).await.unwrap();
for _ in 0..50 {
if mailer.count() > 0 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(mailer.count(), 1);
let sent = mailer.sent();
assert_eq!(sent[0].subject, "Hi");
assert_eq!(sent[0].to, vec!["alice@x.com"]);
q.shutdown().await;
}
#[tokio::test]
async fn no_mailer_registered_returns_queue_error() {
let _g = lock().lock().await;
reset_mailer_registry();
let q = InMemoryJobQueue::with_workers(1);
q.register::<EmailJob>().await;
let dl_count = StdArc::new(std::sync::atomic::AtomicUsize::new(0));
let dl = dl_count.clone();
q.on_dead_letter(move |_dl| {
let dl = dl.clone();
async move {
dl.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
})
.await;
q.start().await;
dispatch_email(&q, &email()).await.unwrap();
for _ in 0..40 {
if dl_count.load(std::sync::atomic::Ordering::SeqCst) > 0 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(dl_count.load(std::sync::atomic::Ordering::SeqCst), 1);
q.shutdown().await;
}
#[tokio::test]
async fn re_register_swaps_the_mailer() {
let _g = lock().lock().await;
reset_mailer_registry();
let m1 = StdArc::new(InMemoryMailer::new());
let m2 = StdArc::new(InMemoryMailer::new());
let q = InMemoryJobQueue::with_workers(1);
register_email_job(&q, EmailJobConfig::new(m1.clone())).await;
register_email_job(&q, EmailJobConfig::new(m2.clone())).await;
q.start().await;
dispatch_email(&q, &email()).await.unwrap();
for _ in 0..50 {
if m2.count() > 0 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(m1.count(), 0, "old mailer should not receive");
assert_eq!(m2.count(), 1, "new mailer should receive");
q.shutdown().await;
}
#[tokio::test]
async fn null_mailer_succeeds_silently() {
let _g = lock().lock().await;
reset_mailer_registry();
let mailer: BoxedMailer = StdArc::new(NullMailer);
let q = InMemoryJobQueue::with_workers(1);
register_email_job(&q, EmailJobConfig::new(mailer)).await;
q.start().await;
dispatch_email(&q, &email()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(q.pending_count().await, 0);
q.shutdown().await;
}
}