email/smtp/
mod.rs

1pub mod config;
2mod error;
3
4use std::{collections::HashSet, sync::Arc};
5
6use async_trait::async_trait;
7use futures::lock::Mutex;
8use mail_parser::{Addr, Address, HeaderName, HeaderValue, Message, MessageParser};
9use mail_send::{
10    smtp::message::{Address as SmtpAddress, IntoMessage, Message as SmtpMessage},
11    SmtpClientBuilder,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(feature = "tokio-native-tls")]
16use tokio_native_tls::TlsStream;
17#[cfg(feature = "tokio-rustls")]
18use tokio_rustls::client::TlsStream;
19use tracing::{debug, info, warn};
20
21use self::config::{SmtpAuthConfig, SmtpConfig};
22#[doc(inline)]
23pub use self::error::{Error, Result};
24use crate::{
25    account::config::AccountConfig,
26    backend::{
27        context::{BackendContext, BackendContextBuilder},
28        feature::{BackendFeature, CheckUp},
29    },
30    message::send::{smtp::SendSmtpMessage, SendMessage},
31    retry::{Retry, RetryState},
32    AnyResult,
33};
34
35/// The SMTP backend context.
36///
37/// This context is unsync, which means it cannot be shared between
38/// threads. For the sync version, see [`SmtpContextSync`].
39pub struct SmtpContext {
40    /// The account configuration.
41    pub account_config: Arc<AccountConfig>,
42
43    /// The SMTP configuration.
44    pub smtp_config: Arc<SmtpConfig>,
45
46    /// The SMTP client builder.
47    client_builder: mail_send::SmtpClientBuilder<String>,
48
49    /// The SMTP client.
50    client: SmtpClientStream,
51}
52
53impl SmtpContext {
54    pub async fn send(&mut self, msg: &[u8]) -> Result<()> {
55        let buffer: Vec<u8>;
56
57        let mut msg = MessageParser::new().parse(msg).unwrap_or_else(|| {
58            debug!("cannot parse raw email message");
59            Default::default()
60        });
61
62        if let Some(cmd) = self.account_config.find_message_pre_send_hook() {
63            match cmd.run_with(msg.raw_message()).await {
64                Ok(res) => {
65                    buffer = res.into();
66                    msg = MessageParser::new().parse(&buffer).unwrap_or_else(|| {
67                        debug!("cannot parse email raw message");
68                        Default::default()
69                    });
70                }
71                Err(_err) => {
72                    debug!("cannot execute pre-send hook: {_err}");
73                    debug!("{_err:?}");
74                }
75            }
76        };
77
78        let mut retry = Retry::default();
79
80        loop {
81            // NOTE: cannot clone the final message
82            let msg = into_smtp_msg(msg.clone())?;
83
84            match retry.next(retry.timeout(self.client.send(msg)).await) {
85                RetryState::Retry => {
86                    debug!(attempt = retry.attempts, "request timed out");
87                    continue;
88                }
89                RetryState::TimedOut => {
90                    break Err(Error::SendMessageTimedOutError);
91                }
92                RetryState::Ok(Ok(res)) => {
93                    break Ok(res);
94                }
95                RetryState::Ok(Err(err)) => {
96                    match err {
97                        mail_send::Error::Timeout => {
98                            warn!("connection timed out");
99                        }
100                        mail_send::Error::Io(err) => {
101                            let reason = err.to_string();
102                            warn!(reason, "connection broke");
103                        }
104                        mail_send::Error::UnexpectedReply(reply) => {
105                            let reason = reply.message;
106                            let code = reply.code;
107                            warn!(reason, "server replied with code {code}");
108                        }
109                        err => {
110                            break Err(Error::SendMessageError(err));
111                        }
112                    };
113
114                    debug!("re-connecting…");
115
116                    self.client = if self.smtp_config.is_encryption_enabled() {
117                        build_tls_client(&self.client_builder).await
118                    } else {
119                        build_tcp_client(&self.client_builder).await
120                    }?;
121
122                    retry.reset();
123                    continue;
124                }
125            }
126        }
127    }
128
129    pub async fn noop(&mut self) -> Result<()> {
130        self.client.noop().await
131    }
132}
133
134/// The sync version of the SMTP backend context.
135///
136/// This is just an SMTP client wrapped into a mutex, so the same SMTP
137/// client can be shared and updated across multiple threads.
138pub type SmtpContextSync = Arc<Mutex<SmtpContext>>;
139
140impl BackendContext for SmtpContextSync {}
141
142/// The SMTP client builder.
143#[derive(Clone)]
144pub struct SmtpContextBuilder {
145    /// The account configuration.
146    pub account_config: Arc<AccountConfig>,
147
148    /// The SMTP configuration.
149    smtp_config: Arc<SmtpConfig>,
150}
151
152impl SmtpContextBuilder {
153    pub fn new(account_config: Arc<AccountConfig>, smtp_config: Arc<SmtpConfig>) -> Self {
154        Self {
155            account_config,
156            smtp_config,
157        }
158    }
159}
160
161#[async_trait]
162impl BackendContextBuilder for SmtpContextBuilder {
163    type Context = SmtpContextSync;
164
165    fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
166        Some(Arc::new(CheckUpSmtp::some_new_boxed))
167    }
168
169    fn send_message(&self) -> Option<BackendFeature<Self::Context, dyn SendMessage>> {
170        Some(Arc::new(SendSmtpMessage::some_new_boxed))
171    }
172
173    /// Build an SMTP sync client.
174    ///
175    /// The SMTP client is created at this moment. If the client
176    /// cannot be created using the OAuth 2.0 authentication, the
177    /// access token is refreshed first then a new client is created.
178    async fn build(self) -> AnyResult<Self::Context> {
179        info!("building new smtp context");
180
181        let mut client_builder =
182            SmtpClientBuilder::new(self.smtp_config.host.clone(), self.smtp_config.port)
183                .credentials(self.smtp_config.credentials().await?)
184                .implicit_tls(!self.smtp_config.is_start_tls_encryption_enabled());
185
186        if self.smtp_config.is_encryption_disabled() {
187            client_builder = client_builder.allow_invalid_certs();
188        }
189
190        let (client_builder, client) = build_client(&self.smtp_config, client_builder).await?;
191
192        let ctx = SmtpContext {
193            account_config: self.account_config,
194            smtp_config: self.smtp_config,
195            client_builder,
196            client,
197        };
198
199        Ok(Arc::new(Mutex::new(ctx)))
200    }
201}
202
203pub enum SmtpClientStream {
204    Tcp(mail_send::SmtpClient<TcpStream>),
205    Tls(mail_send::SmtpClient<TlsStream<TcpStream>>),
206}
207
208impl SmtpClientStream {
209    pub async fn send(&mut self, msg: impl IntoMessage<'_>) -> mail_send::Result<()> {
210        match self {
211            Self::Tcp(client) => client.send(msg).await,
212            Self::Tls(client) => client.send(msg).await,
213        }
214    }
215
216    pub async fn noop(&mut self) -> Result<()> {
217        match self {
218            Self::Tcp(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
219            Self::Tls(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
220        }
221    }
222}
223
224#[derive(Clone)]
225pub struct CheckUpSmtp {
226    ctx: SmtpContextSync,
227}
228
229impl CheckUpSmtp {
230    pub fn new(ctx: &SmtpContextSync) -> Self {
231        Self { ctx: ctx.clone() }
232    }
233
234    pub fn new_boxed(ctx: &SmtpContextSync) -> Box<dyn CheckUp> {
235        Box::new(Self::new(ctx))
236    }
237
238    pub fn some_new_boxed(ctx: &SmtpContextSync) -> Option<Box<dyn CheckUp>> {
239        Some(Self::new_boxed(ctx))
240    }
241}
242
243#[async_trait]
244impl CheckUp for CheckUpSmtp {
245    async fn check_up(&self) -> AnyResult<()> {
246        let mut ctx = self.ctx.lock().await;
247        Ok(ctx.noop().await?)
248    }
249}
250
251pub async fn build_client(
252    smtp_config: &SmtpConfig,
253    #[cfg_attr(not(feature = "oauth2"), allow(unused_mut))]
254    mut client_builder: mail_send::SmtpClientBuilder<String>,
255) -> Result<(mail_send::SmtpClientBuilder<String>, SmtpClientStream)> {
256    match (&smtp_config.auth, smtp_config.is_encryption_enabled()) {
257        (SmtpAuthConfig::Password(_), false) => {
258            let client = build_tcp_client(&client_builder).await?;
259            Ok((client_builder, client))
260        }
261        (SmtpAuthConfig::Password(_), true) => {
262            let client = build_tls_client(&client_builder).await?;
263            Ok((client_builder, client))
264        }
265        #[cfg(feature = "oauth2")]
266        (SmtpAuthConfig::OAuth2(oauth2_config), false) => {
267            match Ok(build_tcp_client(&client_builder).await?) {
268                Ok(client) => Ok((client_builder, client)),
269                Err(Error::ConnectTcpSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
270                    warn!("authentication failed, refreshing access token and retrying…");
271                    oauth2_config
272                        .refresh_access_token()
273                        .await
274                        .map_err(|_| Error::RefreshingAccessTokenFailed)?;
275                    client_builder = client_builder.credentials(smtp_config.credentials().await?);
276                    let client = build_tcp_client(&client_builder).await?;
277                    Ok((client_builder, client))
278                }
279                Err(err) => Err(err),
280            }
281        }
282        #[cfg(feature = "oauth2")]
283        (SmtpAuthConfig::OAuth2(oauth2_config), true) => {
284            match Ok(build_tls_client(&client_builder).await?) {
285                Ok(client) => Ok((client_builder, client)),
286                Err(Error::ConnectTlsSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
287                    warn!("authentication failed, refreshing access token and retrying…");
288                    oauth2_config
289                        .refresh_access_token()
290                        .await
291                        .map_err(|_| Error::RefreshingAccessTokenFailed)?;
292                    client_builder = client_builder.credentials(smtp_config.credentials().await?);
293                    let client = build_tls_client(&client_builder).await?;
294                    Ok((client_builder, client))
295                }
296                Err(err) => Err(err),
297            }
298        }
299    }
300}
301
302pub async fn build_tcp_client(
303    client_builder: &mail_send::SmtpClientBuilder<String>,
304) -> Result<SmtpClientStream> {
305    match client_builder.connect_plain().await {
306        Ok(client) => Ok(SmtpClientStream::Tcp(client)),
307        Err(err) => Err(Error::ConnectTcpSmtpError(err)),
308    }
309}
310
311pub async fn build_tls_client(
312    client_builder: &mail_send::SmtpClientBuilder<String>,
313) -> Result<SmtpClientStream> {
314    match client_builder.connect().await {
315        Ok(client) => Ok(SmtpClientStream::Tls(client)),
316        Err(err) => Err(Error::ConnectTlsSmtpError(err)),
317    }
318}
319
320/// Transform a [`mail_parser::Message`] into a
321/// [`mail_send::smtp::message::Message`].
322///
323/// This function returns an error if no sender or no recipient is
324/// found in the original message.
325fn into_smtp_msg(msg: Message<'_>) -> Result<SmtpMessage<'_>> {
326    let mut mail_from = None;
327    let mut rcpt_to = HashSet::new();
328
329    for header in msg.headers() {
330        let key = &header.name;
331        let val = header.value();
332
333        match key {
334            HeaderName::From => match val {
335                HeaderValue::Address(Address::List(addrs)) => {
336                    if let Some(email) = addrs.first().and_then(find_valid_email) {
337                        mail_from = email.to_string().into();
338                    }
339                }
340                HeaderValue::Address(Address::Group(groups)) => {
341                    if let Some(group) = groups.first() {
342                        if let Some(email) = group.addresses.first().and_then(find_valid_email) {
343                            mail_from = email.to_string().into();
344                        }
345                    }
346                }
347                _ => (),
348            },
349            HeaderName::To | HeaderName::Cc | HeaderName::Bcc => match val {
350                HeaderValue::Address(Address::List(addrs)) => {
351                    rcpt_to.extend(addrs.iter().filter_map(find_valid_email));
352                }
353                HeaderValue::Address(Address::Group(groups)) => {
354                    rcpt_to.extend(
355                        groups
356                            .iter()
357                            .flat_map(|group| group.addresses.iter())
358                            .filter_map(find_valid_email),
359                    );
360                }
361                _ => (),
362            },
363            _ => (),
364        };
365    }
366
367    if rcpt_to.is_empty() {
368        return Err(Error::SendMessageMissingRecipientError);
369    }
370
371    let msg = SmtpMessage {
372        mail_from: mail_from
373            .ok_or(Error::SendMessageMissingSenderError)?
374            .into(),
375        rcpt_to: rcpt_to
376            .into_iter()
377            .map(|email| SmtpAddress {
378                email: email.into(),
379                ..Default::default()
380            })
381            .collect(),
382        body: msg.raw_message,
383    };
384
385    Ok(msg)
386}
387
388fn find_valid_email(addr: &Addr) -> Option<String> {
389    match &addr.address {
390        None => None,
391        Some(email) => {
392            let email = email.trim();
393            if email.is_empty() {
394                None
395            } else {
396                Some(email.to_string())
397            }
398        }
399    }
400}