pub mod config;
mod error;
use std::{collections::HashSet, sync::Arc};
use async_trait::async_trait;
use futures::lock::Mutex;
use mail_parser::{Addr, Address, HeaderName, HeaderValue, Message, MessageParser};
use mail_send::{
smtp::message::{Address as SmtpAddress, IntoMessage, Message as SmtpMessage},
SmtpClientBuilder,
};
#[cfg(feature = "tokio")]
use tokio::net::TcpStream;
#[cfg(feature = "tokio-native-tls")]
use tokio_native_tls::TlsStream;
#[cfg(feature = "tokio-rustls")]
use tokio_rustls::client::TlsStream;
use tracing::{debug, info, warn};
use self::config::{SmtpAuthConfig, SmtpConfig};
#[doc(inline)]
pub use self::error::{Error, Result};
use crate::{
account::config::AccountConfig,
backend::{
context::{BackendContext, BackendContextBuilder},
feature::{BackendFeature, CheckUp},
},
message::send::{smtp::SendSmtpMessage, SendMessage},
retry::{Retry, RetryState},
AnyResult,
};
pub struct SmtpContext {
pub account_config: Arc<AccountConfig>,
pub smtp_config: Arc<SmtpConfig>,
client_builder: mail_send::SmtpClientBuilder<String>,
client: SmtpClientStream,
}
impl SmtpContext {
pub async fn send(&mut self, msg: &[u8]) -> Result<()> {
let buffer: Vec<u8>;
let mut msg = MessageParser::new().parse(msg).unwrap_or_else(|| {
debug!("cannot parse raw email message");
Default::default()
});
if let Some(cmd) = self.account_config.find_message_pre_send_hook() {
match cmd.run_with(msg.raw_message()).await {
Ok(res) => {
buffer = res.into();
msg = MessageParser::new().parse(&buffer).unwrap_or_else(|| {
debug!("cannot parse email raw message");
Default::default()
});
}
Err(_err) => {
debug!("cannot execute pre-send hook: {_err}");
debug!("{_err:?}");
}
}
};
let mut retry = Retry::default();
loop {
let msg = into_smtp_msg(msg.clone())?;
match retry.next(retry.timeout(self.client.send(msg)).await) {
RetryState::Retry => {
debug!(attempt = retry.attempts, "request timed out");
continue;
}
RetryState::TimedOut => {
break Err(Error::SendMessageTimedOutError);
}
RetryState::Ok(Ok(res)) => {
break Ok(res);
}
RetryState::Ok(Err(err)) => {
match err {
mail_send::Error::Timeout => {
warn!("connection timed out");
}
mail_send::Error::Io(err) => {
let reason = err.to_string();
warn!(reason, "connection broke");
}
mail_send::Error::UnexpectedReply(reply) => {
let reason = reply.message;
let code = reply.code;
warn!(reason, "server replied with code {code}");
}
err => {
break Err(Error::SendMessageError(err));
}
};
debug!("re-connecting…");
self.client = if self.smtp_config.is_encryption_enabled() {
build_tls_client(&self.client_builder).await
} else {
build_tcp_client(&self.client_builder).await
}?;
retry.reset();
continue;
}
}
}
}
pub async fn noop(&mut self) -> Result<()> {
self.client.noop().await
}
}
pub type SmtpContextSync = Arc<Mutex<SmtpContext>>;
impl BackendContext for SmtpContextSync {}
#[derive(Clone)]
pub struct SmtpContextBuilder {
pub account_config: Arc<AccountConfig>,
smtp_config: Arc<SmtpConfig>,
}
impl SmtpContextBuilder {
pub fn new(account_config: Arc<AccountConfig>, smtp_config: Arc<SmtpConfig>) -> Self {
Self {
account_config,
smtp_config,
}
}
}
#[async_trait]
impl BackendContextBuilder for SmtpContextBuilder {
type Context = SmtpContextSync;
fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
Some(Arc::new(CheckUpSmtp::some_new_boxed))
}
fn send_message(&self) -> Option<BackendFeature<Self::Context, dyn SendMessage>> {
Some(Arc::new(SendSmtpMessage::some_new_boxed))
}
async fn build(self) -> AnyResult<Self::Context> {
info!("building new smtp context");
let mut client_builder =
SmtpClientBuilder::new(self.smtp_config.host.clone(), self.smtp_config.port)
.credentials(self.smtp_config.credentials().await?)
.implicit_tls(!self.smtp_config.is_start_tls_encryption_enabled());
if self.smtp_config.is_encryption_disabled() {
client_builder = client_builder.allow_invalid_certs();
}
let (client_builder, client) = build_client(&self.smtp_config, client_builder).await?;
let ctx = SmtpContext {
account_config: self.account_config,
smtp_config: self.smtp_config,
client_builder,
client,
};
Ok(Arc::new(Mutex::new(ctx)))
}
}
pub enum SmtpClientStream {
Tcp(mail_send::SmtpClient<TcpStream>),
Tls(mail_send::SmtpClient<TlsStream<TcpStream>>),
}
impl SmtpClientStream {
pub async fn send(&mut self, msg: impl IntoMessage<'_>) -> mail_send::Result<()> {
match self {
Self::Tcp(client) => client.send(msg).await,
Self::Tls(client) => client.send(msg).await,
}
}
pub async fn noop(&mut self) -> Result<()> {
match self {
Self::Tcp(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
Self::Tls(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
}
}
}
#[derive(Clone)]
pub struct CheckUpSmtp {
ctx: SmtpContextSync,
}
impl CheckUpSmtp {
pub fn new(ctx: &SmtpContextSync) -> Self {
Self { ctx: ctx.clone() }
}
pub fn new_boxed(ctx: &SmtpContextSync) -> Box<dyn CheckUp> {
Box::new(Self::new(ctx))
}
pub fn some_new_boxed(ctx: &SmtpContextSync) -> Option<Box<dyn CheckUp>> {
Some(Self::new_boxed(ctx))
}
}
#[async_trait]
impl CheckUp for CheckUpSmtp {
async fn check_up(&self) -> AnyResult<()> {
let mut ctx = self.ctx.lock().await;
Ok(ctx.noop().await?)
}
}
pub async fn build_client(
smtp_config: &SmtpConfig,
#[cfg_attr(not(feature = "oauth2"), allow(unused_mut))]
mut client_builder: mail_send::SmtpClientBuilder<String>,
) -> Result<(mail_send::SmtpClientBuilder<String>, SmtpClientStream)> {
match (&smtp_config.auth, smtp_config.is_encryption_enabled()) {
(SmtpAuthConfig::Password(_), false) => {
let client = build_tcp_client(&client_builder).await?;
Ok((client_builder, client))
}
(SmtpAuthConfig::Password(_), true) => {
let client = build_tls_client(&client_builder).await?;
Ok((client_builder, client))
}
#[cfg(feature = "oauth2")]
(SmtpAuthConfig::OAuth2(oauth2_config), false) => {
match Ok(build_tcp_client(&client_builder).await?) {
Ok(client) => Ok((client_builder, client)),
Err(Error::ConnectTcpSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
warn!("authentication failed, refreshing access token and retrying…");
oauth2_config
.refresh_access_token()
.await
.map_err(|_| Error::RefreshingAccessTokenFailed)?;
client_builder = client_builder.credentials(smtp_config.credentials().await?);
let client = build_tcp_client(&client_builder).await?;
Ok((client_builder, client))
}
Err(err) => Err(err),
}
}
#[cfg(feature = "oauth2")]
(SmtpAuthConfig::OAuth2(oauth2_config), true) => {
match Ok(build_tls_client(&client_builder).await?) {
Ok(client) => Ok((client_builder, client)),
Err(Error::ConnectTlsSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
warn!("authentication failed, refreshing access token and retrying…");
oauth2_config
.refresh_access_token()
.await
.map_err(|_| Error::RefreshingAccessTokenFailed)?;
client_builder = client_builder.credentials(smtp_config.credentials().await?);
let client = build_tls_client(&client_builder).await?;
Ok((client_builder, client))
}
Err(err) => Err(err),
}
}
}
}
pub async fn build_tcp_client(
client_builder: &mail_send::SmtpClientBuilder<String>,
) -> Result<SmtpClientStream> {
match client_builder.connect_plain().await {
Ok(client) => Ok(SmtpClientStream::Tcp(client)),
Err(err) => Err(Error::ConnectTcpSmtpError(err)),
}
}
pub async fn build_tls_client(
client_builder: &mail_send::SmtpClientBuilder<String>,
) -> Result<SmtpClientStream> {
match client_builder.connect().await {
Ok(client) => Ok(SmtpClientStream::Tls(client)),
Err(err) => Err(Error::ConnectTlsSmtpError(err)),
}
}
fn into_smtp_msg(msg: Message<'_>) -> Result<SmtpMessage<'_>> {
let mut mail_from = None;
let mut rcpt_to = HashSet::new();
for header in msg.headers() {
let key = &header.name;
let val = header.value();
match key {
HeaderName::From => match val {
HeaderValue::Address(Address::List(addrs)) => {
if let Some(email) = addrs.first().and_then(find_valid_email) {
mail_from = email.to_string().into();
}
}
HeaderValue::Address(Address::Group(groups)) => {
if let Some(group) = groups.first() {
if let Some(email) = group.addresses.first().and_then(find_valid_email) {
mail_from = email.to_string().into();
}
}
}
_ => (),
},
HeaderName::To | HeaderName::Cc | HeaderName::Bcc => match val {
HeaderValue::Address(Address::List(addrs)) => {
rcpt_to.extend(addrs.iter().filter_map(find_valid_email));
}
HeaderValue::Address(Address::Group(groups)) => {
rcpt_to.extend(
groups
.iter()
.flat_map(|group| group.addresses.iter())
.filter_map(find_valid_email),
);
}
_ => (),
},
_ => (),
};
}
if rcpt_to.is_empty() {
return Err(Error::SendMessageMissingRecipientError);
}
let msg = SmtpMessage {
mail_from: mail_from
.ok_or(Error::SendMessageMissingSenderError)?
.into(),
rcpt_to: rcpt_to
.into_iter()
.map(|email| SmtpAddress {
email: email.into(),
..Default::default()
})
.collect(),
body: msg.raw_message,
};
Ok(msg)
}
fn find_valid_email(addr: &Addr) -> Option<String> {
match &addr.address {
None => None,
Some(email) => {
let email = email.trim();
if email.is_empty() {
None
} else {
Some(email.to_string())
}
}
}
}