use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{Mutex, Notify};
#[derive(Error, Debug)]
pub enum EmailError {
#[error("Email configuration error: {0}")]
ConfigError(String),
#[error("SMTP error: {0}")]
SmtpError(String),
#[error("Invalid message: {0}")]
InvalidMessage(String),
#[error("Template error: {0}")]
TemplateError(String),
#[error("Queue error: {0}")]
QueueError(String),
}
pub type EmailResult<T> = Result<T, EmailError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailConfig {
pub smtp_host: String,
pub smtp_port: u16,
pub username: String,
pub password: String,
pub from_address: String,
pub from_name: String,
pub tls: bool,
}
impl EmailConfig {
pub fn new(
smtp_host: impl Into<String>,
smtp_port: u16,
username: impl Into<String>,
password: impl Into<String>,
from_address: impl Into<String>,
) -> Self {
Self {
smtp_host: smtp_host.into(),
smtp_port,
username: username.into(),
password: password.into(),
from_address: from_address.into(),
from_name: String::new(),
tls: true,
}
}
pub fn from_name(mut self, name: impl Into<String>) -> Self {
self.from_name = name.into();
self
}
pub fn tls(mut self, tls: bool) -> Self {
self.tls = tls;
self
}
pub fn validate(&self) -> EmailResult<()> {
if self.smtp_host.is_empty() {
return Err(EmailError::ConfigError("smtp_host is required".into()));
}
if self.from_address.is_empty() || !self.from_address.contains('@') {
return Err(EmailError::ConfigError("from_address must be a valid email".into()));
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Attachment {
pub name: String,
pub content_type: String,
pub data: Vec<u8>,
}
impl Attachment {
pub fn new(name: impl Into<String>, content_type: impl Into<String>, data: Vec<u8>) -> Self {
Self {
name: name.into(),
content_type: content_type.into(),
data,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailMessage {
pub to: Vec<String>,
pub cc: Vec<String>,
pub bcc: Vec<String>,
pub subject: String,
pub body: String,
pub html_body: Option<String>,
pub attachments: Vec<Attachment>,
}
impl Default for EmailMessage {
fn default() -> Self {
Self::new()
}
}
impl EmailMessage {
pub fn new() -> Self {
Self {
to: Vec::new(),
cc: Vec::new(),
bcc: Vec::new(),
subject: String::new(),
body: String::new(),
html_body: None,
attachments: Vec::new(),
}
}
pub fn to(mut self, addr: impl Into<String>) -> Self {
self.to.push(addr.into());
self
}
pub fn cc(mut self, addr: impl Into<String>) -> Self {
self.cc.push(addr.into());
self
}
pub fn bcc(mut self, addr: impl Into<String>) -> Self {
self.bcc.push(addr.into());
self
}
pub fn subject(mut self, subject: impl Into<String>) -> Self {
self.subject = subject.into();
self
}
pub fn body(mut self, body: impl Into<String>) -> Self {
self.body = body.into();
self
}
pub fn html_body(mut self, html: impl Into<String>) -> Self {
self.html_body = Some(html.into());
self
}
pub fn attachment(mut self, attachment: Attachment) -> Self {
self.attachments.push(attachment);
self
}
pub fn validate(&self) -> EmailResult<()> {
if self.to.is_empty() && self.cc.is_empty() && self.bcc.is_empty() {
return Err(EmailError::InvalidMessage(
"Email must have at least one recipient".into(),
));
}
if self.subject.is_empty() {
return Err(EmailError::InvalidMessage("Subject is required".into()));
}
Ok(())
}
pub fn recipient_count(&self) -> usize {
self.to.len() + self.cc.len() + self.bcc.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailTemplate {
pub template: String,
pub variables: HashMap<String, String>,
}
impl EmailTemplate {
pub fn new(template: impl Into<String>) -> Self {
Self {
template: template.into(),
variables: HashMap::new(),
}
}
pub fn variable(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.variables.insert(key.into(), value.into());
self
}
pub fn variables(mut self, vars: HashMap<String, String>) -> Self {
self.variables = vars;
self
}
pub fn render(&self) -> EmailResult<String> {
let mut result = self.template.clone();
for (key, value) in &self.variables {
let placeholder = format!("{{{{{}}}}}", key);
result = result.replace(&placeholder, value);
}
Ok(result)
}
pub fn render_message(
&self,
to: impl Into<String>,
subject: impl Into<String>,
) -> EmailResult<EmailMessage> {
let body = self.render()?;
Ok(EmailMessage::new().to(to).subject(subject).body(body))
}
}
#[async_trait::async_trait]
pub trait EmailSender: Send + Sync {
async fn send(&self, message: EmailMessage) -> EmailResult<()>;
}
#[derive(Debug, Clone)]
pub struct SmtpEmailSender {
config: EmailConfig,
}
impl SmtpEmailSender {
pub fn new(config: EmailConfig) -> Self {
Self { config }
}
pub fn config(&self) -> &EmailConfig {
&self.config
}
async fn read_response<R: tokio::io::AsyncBufRead + Unpin>(reader: &mut R) -> EmailResult<u16> {
let mut line = String::new();
let mut code: u16;
loop {
line.clear();
let n = reader
.read_line(&mut line)
.await
.map_err(|e| EmailError::SmtpError(format!("failed to read SMTP response: {e}")))?;
if n == 0 {
return Err(EmailError::SmtpError("SMTP connection closed unexpectedly".into()));
}
if line.len() >= 4 {
code = line[..3]
.parse::<u16>()
.map_err(|_| EmailError::SmtpError(format!("invalid SMTP response: {line}")))?;
if line.as_bytes()[3] == b' ' {
break;
}
}
}
Ok(code)
}
async fn send_command<W: tokio::io::AsyncWrite + Unpin, R: tokio::io::AsyncBufRead + Unpin>(
writer: &mut W,
reader: &mut R,
command: &str,
) -> EmailResult<u16> {
writer
.write_all(command.as_bytes())
.await
.map_err(|e| EmailError::SmtpError(format!("failed to write SMTP command: {e}")))?;
writer
.flush()
.await
.map_err(|e| EmailError::SmtpError(format!("failed to flush SMTP command: {e}")))?;
Self::read_response(reader).await
}
}
#[async_trait::async_trait]
impl EmailSender for SmtpEmailSender {
async fn send(&self, message: EmailMessage) -> EmailResult<()> {
self.config.validate()?;
message.validate()?;
let addr = format!("{}:{}", self.config.smtp_host, self.config.smtp_port);
let stream = tokio::net::TcpStream::connect(&addr).await.map_err(|e| {
EmailError::SmtpError(format!("failed to connect to SMTP server {addr}: {e}"))
})?;
let (reader, mut writer) = tokio::io::split(stream);
let mut reader = tokio::io::BufReader::new(reader);
let greeting_code = Self::read_response(&mut reader).await?;
if !(greeting_code >= 200 && greeting_code < 300) {
return Err(EmailError::SmtpError(format!(
"SMTP server greeting failed with code {greeting_code}"
)));
}
let hostname = "hiver.local";
let ehlo_code =
Self::send_command(&mut writer, &mut reader, &format!("EHLO {hostname}\r\n")).await?;
if ehlo_code != 250 {
return Err(EmailError::SmtpError(format!("SMTP EHLO rejected with code {ehlo_code}")));
}
let mail_from_cmd = format!("MAIL FROM:<{}>\r\n", self.config.from_address);
let mail_code = Self::send_command(&mut writer, &mut reader, &mail_from_cmd).await?;
if mail_code != 250 {
return Err(EmailError::SmtpError(format!(
"SMTP MAIL FROM rejected with code {mail_code}"
)));
}
for recipient in message
.to
.iter()
.chain(message.cc.iter())
.chain(message.bcc.iter())
{
let rcpt_cmd = format!("RCPT TO:<{recipient}>\r\n");
let rcpt_code = Self::send_command(&mut writer, &mut reader, &rcpt_cmd).await?;
if rcpt_code != 250 {
return Err(EmailError::SmtpError(format!(
"SMTP RCPT TO <{recipient}> rejected with code {rcpt_code}"
)));
}
}
let data_code = Self::send_command(&mut writer, &mut reader, "DATA\r\n").await?;
if data_code != 354 {
return Err(EmailError::SmtpError(format!("SMTP DATA rejected with code {data_code}")));
}
let mut data_payload = String::new();
data_payload.push_str(&format!("From: {}\r\n", self.config.from_address));
for to_addr in &message.to {
data_payload.push_str(&format!("To: {to_addr}\r\n"));
}
for cc_addr in &message.cc {
data_payload.push_str(&format!("Cc: {cc_addr}\r\n"));
}
data_payload.push_str(&format!("Subject: {}\r\n", message.subject));
data_payload.push_str("Content-Type: text/plain; charset=utf-8\r\n");
data_payload.push_str("\r\n");
for line in message.body.lines() {
if line.starts_with('.') {
data_payload.push('.');
}
data_payload.push_str(line);
data_payload.push_str("\r\n");
}
if !message.body.ends_with('\n') {
data_payload.push_str("\r\n");
}
data_payload.push_str(".\r\n");
writer
.write_all(data_payload.as_bytes())
.await
.map_err(|e| {
EmailError::SmtpError(format!("failed to write SMTP DATA payload: {e}"))
})?;
writer.flush().await.map_err(|e| {
EmailError::SmtpError(format!("failed to flush SMTP DATA payload: {e}"))
})?;
let data_end_code = Self::read_response(&mut reader).await?;
if data_end_code != 250 {
return Err(EmailError::SmtpError(format!(
"SMTP DATA end rejected with code {data_end_code}"
)));
}
let _ = Self::send_command(&mut writer, &mut reader, "QUIT\r\n").await;
tracing::info!(
"[SMTP] Sent email via {}:{} | From: {} | To: {:?} | Subject: {}",
self.config.smtp_host,
self.config.smtp_port,
self.config.from_address,
message.to,
message.subject,
);
Ok(())
}
}
#[derive(Debug)]
pub struct EmailQueue {
sender: Arc<Mutex<Vec<EmailMessage>>>,
notify: Arc<Notify>,
}
impl Default for EmailQueue {
fn default() -> Self {
Self::new()
}
}
impl EmailQueue {
pub fn new() -> Self {
Self {
sender: Arc::new(Mutex::new(Vec::new())),
notify: Arc::new(Notify::new()),
}
}
pub async fn enqueue(&self, message: EmailMessage) {
let mut queue = self.sender.lock().await;
queue.push(message);
self.notify.notify_one();
}
pub async fn process_queue(&self, sender: &dyn EmailSender) -> EmailResult<usize> {
let mut queue = self.sender.lock().await;
let batch = std::mem::take(&mut *queue);
drop(queue);
let mut sent = 0usize;
for message in &batch {
if sender.send(message.clone()).await.is_ok() {
sent += 1;
}
}
Ok(sent)
}
pub async fn len(&self) -> usize {
self.sender.lock().await.len()
}
pub async fn is_empty(&self) -> bool {
self.sender.lock().await.is_empty()
}
pub async fn clear(&self) {
self.sender.lock().await.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_new() {
let cfg = EmailConfig::new("smtp.host", 587, "user", "pass", "from@host");
assert_eq!(cfg.smtp_host, "smtp.host");
assert_eq!(cfg.smtp_port, 587);
assert!(cfg.tls);
}
#[test]
fn test_config_builder() {
let cfg = EmailConfig::new("smtp.host", 25, "u", "p", "f@h")
.from_name("Hiver")
.tls(false);
assert_eq!(cfg.from_name, "Hiver");
assert!(!cfg.tls);
}
#[test]
fn test_config_validate_ok() {
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "from@host");
assert!(cfg.validate().is_ok());
}
#[test]
fn test_config_validate_missing_host() {
let cfg = EmailConfig::new("", 587, "u", "p", "from@host");
assert!(cfg.validate().is_err());
}
#[test]
fn test_config_validate_invalid_from() {
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "no-at-sign");
assert!(cfg.validate().is_err());
}
#[test]
fn test_config_validate_empty_from() {
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "");
assert!(cfg.validate().is_err());
}
#[test]
fn test_config_serialization() {
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "from@host");
let json = serde_json::to_string(&cfg).unwrap();
let deserialized: EmailConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.smtp_host, cfg.smtp_host);
}
#[test]
fn test_message_builder() {
let msg = EmailMessage::new()
.to("alice@ex.com")
.to("bob@ex.com")
.cc("cc@ex.com")
.bcc("bcc@ex.com")
.subject("Test")
.body("Hello")
.html_body("<p>Hello</p>");
assert_eq!(msg.to.len(), 2);
assert_eq!(msg.cc.len(), 1);
assert_eq!(msg.bcc.len(), 1);
assert_eq!(msg.subject, "Test");
assert!(msg.html_body.is_some());
assert_eq!(msg.recipient_count(), 4);
}
#[test]
fn test_message_validate_ok() {
let msg = EmailMessage::new().to("a@b.com").subject("Hi").body("Body");
assert!(msg.validate().is_ok());
}
#[test]
fn test_message_validate_no_recipient() {
let msg = EmailMessage::new().subject("Hi");
assert!(msg.validate().is_err());
}
#[test]
fn test_message_validate_no_subject() {
let msg = EmailMessage::new().to("a@b.com");
assert!(msg.validate().is_err());
}
#[test]
fn test_message_serialization() {
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
let json = serde_json::to_string(&msg).unwrap();
let deserialized: EmailMessage = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.subject, "S");
}
#[test]
fn test_attachment_new() {
let att = Attachment::new("file.pdf", "application/pdf", vec![0, 1, 2]);
assert_eq!(att.name, "file.pdf");
assert_eq!(att.data.len(), 3);
}
#[test]
fn test_template_render() {
let tmpl = EmailTemplate::new("Hello {{name}}, welcome to {{app}}!")
.variable("name", "Alice")
.variable("app", "Hiver");
let rendered = tmpl.render().unwrap();
assert_eq!(rendered, "Hello Alice, welcome to Hiver!");
}
#[test]
fn test_template_unknown_variable_unchanged() {
let tmpl = EmailTemplate::new("Hello {{name}}! {{unknown}}").variable("name", "Bob");
let rendered = tmpl.render().unwrap();
assert_eq!(rendered, "Hello Bob! {{unknown}}");
}
#[test]
fn test_template_no_variables() {
let tmpl = EmailTemplate::new("Static content");
assert_eq!(tmpl.render().unwrap(), "Static content");
}
#[test]
fn test_template_render_message() {
let tmpl = EmailTemplate::new("Hi {{user}}, code: {{code}}")
.variable("user", "Eve")
.variable("code", "12345");
let msg = tmpl.render_message("eve@ex.com", "Your code").unwrap();
assert_eq!(msg.to[0], "eve@ex.com");
assert_eq!(msg.subject, "Your code");
assert!(msg.body.contains("Eve"));
assert!(msg.body.contains("12345"));
}
#[test]
fn test_template_multiple_same_variable() {
let tmpl = EmailTemplate::new("{{x}} and {{x}}").variable("x", "val");
let rendered = tmpl.render().unwrap();
assert_eq!(rendered, "val and val");
}
#[test]
fn test_template_set_variables_batch() {
let mut vars = HashMap::new();
vars.insert("a".to_string(), "1".to_string());
vars.insert("b".to_string(), "2".to_string());
let tmpl = EmailTemplate::new("{{a}}-{{b}}").variables(vars);
assert_eq!(tmpl.render().unwrap(), "1-2");
}
#[tokio::test]
async fn test_smtp_sender_connection_failure() {
let cfg = EmailConfig::new("smtp.nonexistent.invalid", 587, "u", "p", "from@host");
let sender = SmtpEmailSender::new(cfg);
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
let err = sender.send(msg).await.unwrap_err();
assert!(err.to_string().contains("SMTP"));
}
#[tokio::test]
async fn test_smtp_sender_invalid_config() {
let cfg = EmailConfig::new("", 587, "u", "p", "bad");
let sender = SmtpEmailSender::new(cfg);
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
assert!(sender.send(msg).await.is_err());
}
#[tokio::test]
async fn test_smtp_sender_invalid_message() {
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "from@host");
let sender = SmtpEmailSender::new(cfg);
let msg = EmailMessage::new(); assert!(sender.send(msg).await.is_err());
}
#[test]
fn test_smtp_sender_config_access() {
let cfg = EmailConfig::new("h", 25, "u", "p", "f@h");
let sender = SmtpEmailSender::new(cfg);
assert_eq!(sender.config().smtp_host, "h");
}
#[tokio::test]
async fn test_queue_enqueue_and_len() {
let queue = EmailQueue::new();
assert!(queue.is_empty().await);
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
queue.enqueue(msg).await;
assert_eq!(queue.len().await, 1);
}
#[tokio::test]
async fn test_queue_process_all_fail() {
let queue = EmailQueue::new();
let cfg = EmailConfig::new("smtp.nonexistent.invalid", 587, "u", "p", "from@host");
let sender = SmtpEmailSender::new(cfg);
for i in 0..3 {
let msg = EmailMessage::new()
.to(format!("a{}@b.com", i))
.subject(format!("S{}", i))
.body("B");
queue.enqueue(msg).await;
}
let sent = queue.process_queue(&sender).await.unwrap();
assert_eq!(sent, 0);
assert!(queue.is_empty().await);
}
#[tokio::test]
async fn test_queue_clear() {
let queue = EmailQueue::new();
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
queue.enqueue(msg).await;
assert_eq!(queue.len().await, 1);
queue.clear().await;
assert!(queue.is_empty().await);
}
#[tokio::test]
async fn test_queue_process_with_failures() {
let queue = EmailQueue::new();
let bad_cfg = EmailConfig::new("", 587, "u", "p", "bad");
let sender = SmtpEmailSender::new(bad_cfg);
let msg = EmailMessage::new().to("a@b.com").subject("S").body("B");
queue.enqueue(msg).await;
let sent = queue.process_queue(&sender).await.unwrap();
assert_eq!(sent, 0);
assert!(queue.is_empty().await); }
#[tokio::test]
async fn test_queue_process_empty() {
let queue = EmailQueue::new();
let cfg = EmailConfig::new("smtp.host", 587, "u", "p", "from@host");
let sender = SmtpEmailSender::new(cfg);
let sent = queue.process_queue(&sender).await.unwrap();
assert_eq!(sent, 0);
}
}