1pub mod config;
2mod error;
3
4use std::{collections::HashSet, sync::Arc};
5
6use async_trait::async_trait;
7use futures::lock::Mutex;
8use mail_parser::{Addr, Address, HeaderName, HeaderValue, Message, MessageParser};
9use mail_send::{
10 smtp::message::{Address as SmtpAddress, IntoMessage, Message as SmtpMessage},
11 SmtpClientBuilder,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(feature = "tokio-native-tls")]
16use tokio_native_tls::TlsStream;
17#[cfg(feature = "tokio-rustls")]
18use tokio_rustls::client::TlsStream;
19use tracing::{debug, info, warn};
20
21use self::config::{SmtpAuthConfig, SmtpConfig};
22#[doc(inline)]
23pub use self::error::{Error, Result};
24use crate::{
25 account::config::AccountConfig,
26 backend::{
27 context::{BackendContext, BackendContextBuilder},
28 feature::{BackendFeature, CheckUp},
29 },
30 message::send::{smtp::SendSmtpMessage, SendMessage},
31 retry::{Retry, RetryState},
32 AnyResult,
33};
34
35pub struct SmtpContext {
40 pub account_config: Arc<AccountConfig>,
42
43 pub smtp_config: Arc<SmtpConfig>,
45
46 client_builder: mail_send::SmtpClientBuilder<String>,
48
49 client: SmtpClientStream,
51}
52
53impl SmtpContext {
54 pub async fn send(&mut self, msg: &[u8]) -> Result<()> {
55 let buffer: Vec<u8>;
56
57 let mut msg = MessageParser::new().parse(msg).unwrap_or_else(|| {
58 debug!("cannot parse raw email message");
59 Default::default()
60 });
61
62 if let Some(cmd) = self.account_config.find_message_pre_send_hook() {
63 match cmd.run_with(msg.raw_message()).await {
64 Ok(res) => {
65 buffer = res.into();
66 msg = MessageParser::new().parse(&buffer).unwrap_or_else(|| {
67 debug!("cannot parse email raw message");
68 Default::default()
69 });
70 }
71 Err(_err) => {
72 debug!("cannot execute pre-send hook: {_err}");
73 debug!("{_err:?}");
74 }
75 }
76 };
77
78 let mut retry = Retry::default();
79
80 loop {
81 let msg = into_smtp_msg(msg.clone())?;
83
84 match retry.next(retry.timeout(self.client.send(msg)).await) {
85 RetryState::Retry => {
86 debug!(attempt = retry.attempts, "request timed out");
87 continue;
88 }
89 RetryState::TimedOut => {
90 break Err(Error::SendMessageTimedOutError);
91 }
92 RetryState::Ok(Ok(res)) => {
93 break Ok(res);
94 }
95 RetryState::Ok(Err(err)) => {
96 match err {
97 mail_send::Error::Timeout => {
98 warn!("connection timed out");
99 }
100 mail_send::Error::Io(err) => {
101 let reason = err.to_string();
102 warn!(reason, "connection broke");
103 }
104 mail_send::Error::UnexpectedReply(reply) => {
105 let reason = reply.message;
106 let code = reply.code;
107 warn!(reason, "server replied with code {code}");
108 }
109 err => {
110 break Err(Error::SendMessageError(err));
111 }
112 };
113
114 debug!("re-connecting…");
115
116 self.client = if self.smtp_config.is_encryption_enabled() {
117 build_tls_client(&self.client_builder).await
118 } else {
119 build_tcp_client(&self.client_builder).await
120 }?;
121
122 retry.reset();
123 continue;
124 }
125 }
126 }
127 }
128
129 pub async fn noop(&mut self) -> Result<()> {
130 self.client.noop().await
131 }
132}
133
134pub type SmtpContextSync = Arc<Mutex<SmtpContext>>;
139
140impl BackendContext for SmtpContextSync {}
141
142#[derive(Clone)]
144pub struct SmtpContextBuilder {
145 pub account_config: Arc<AccountConfig>,
147
148 smtp_config: Arc<SmtpConfig>,
150}
151
152impl SmtpContextBuilder {
153 pub fn new(account_config: Arc<AccountConfig>, smtp_config: Arc<SmtpConfig>) -> Self {
154 Self {
155 account_config,
156 smtp_config,
157 }
158 }
159}
160
161#[async_trait]
162impl BackendContextBuilder for SmtpContextBuilder {
163 type Context = SmtpContextSync;
164
165 fn check_up(&self) -> Option<BackendFeature<Self::Context, dyn CheckUp>> {
166 Some(Arc::new(CheckUpSmtp::some_new_boxed))
167 }
168
169 fn send_message(&self) -> Option<BackendFeature<Self::Context, dyn SendMessage>> {
170 Some(Arc::new(SendSmtpMessage::some_new_boxed))
171 }
172
173 async fn build(self) -> AnyResult<Self::Context> {
179 info!("building new smtp context");
180
181 let mut client_builder =
182 SmtpClientBuilder::new(self.smtp_config.host.clone(), self.smtp_config.port)
183 .credentials(self.smtp_config.credentials().await?)
184 .implicit_tls(!self.smtp_config.is_start_tls_encryption_enabled());
185
186 if self.smtp_config.is_encryption_disabled() {
187 client_builder = client_builder.allow_invalid_certs();
188 }
189
190 let (client_builder, client) = build_client(&self.smtp_config, client_builder).await?;
191
192 let ctx = SmtpContext {
193 account_config: self.account_config,
194 smtp_config: self.smtp_config,
195 client_builder,
196 client,
197 };
198
199 Ok(Arc::new(Mutex::new(ctx)))
200 }
201}
202
203pub enum SmtpClientStream {
204 Tcp(mail_send::SmtpClient<TcpStream>),
205 Tls(mail_send::SmtpClient<TlsStream<TcpStream>>),
206}
207
208impl SmtpClientStream {
209 pub async fn send(&mut self, msg: impl IntoMessage<'_>) -> mail_send::Result<()> {
210 match self {
211 Self::Tcp(client) => client.send(msg).await,
212 Self::Tls(client) => client.send(msg).await,
213 }
214 }
215
216 pub async fn noop(&mut self) -> Result<()> {
217 match self {
218 Self::Tcp(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
219 Self::Tls(client) => client.noop().await.map_err(Error::MailSendNoOpFailed),
220 }
221 }
222}
223
224#[derive(Clone)]
225pub struct CheckUpSmtp {
226 ctx: SmtpContextSync,
227}
228
229impl CheckUpSmtp {
230 pub fn new(ctx: &SmtpContextSync) -> Self {
231 Self { ctx: ctx.clone() }
232 }
233
234 pub fn new_boxed(ctx: &SmtpContextSync) -> Box<dyn CheckUp> {
235 Box::new(Self::new(ctx))
236 }
237
238 pub fn some_new_boxed(ctx: &SmtpContextSync) -> Option<Box<dyn CheckUp>> {
239 Some(Self::new_boxed(ctx))
240 }
241}
242
243#[async_trait]
244impl CheckUp for CheckUpSmtp {
245 async fn check_up(&self) -> AnyResult<()> {
246 let mut ctx = self.ctx.lock().await;
247 Ok(ctx.noop().await?)
248 }
249}
250
251pub async fn build_client(
252 smtp_config: &SmtpConfig,
253 #[cfg_attr(not(feature = "oauth2"), allow(unused_mut))]
254 mut client_builder: mail_send::SmtpClientBuilder<String>,
255) -> Result<(mail_send::SmtpClientBuilder<String>, SmtpClientStream)> {
256 match (&smtp_config.auth, smtp_config.is_encryption_enabled()) {
257 (SmtpAuthConfig::Password(_), false) => {
258 let client = build_tcp_client(&client_builder).await?;
259 Ok((client_builder, client))
260 }
261 (SmtpAuthConfig::Password(_), true) => {
262 let client = build_tls_client(&client_builder).await?;
263 Ok((client_builder, client))
264 }
265 #[cfg(feature = "oauth2")]
266 (SmtpAuthConfig::OAuth2(oauth2_config), false) => {
267 match Ok(build_tcp_client(&client_builder).await?) {
268 Ok(client) => Ok((client_builder, client)),
269 Err(Error::ConnectTcpSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
270 warn!("authentication failed, refreshing access token and retrying…");
271 oauth2_config
272 .refresh_access_token()
273 .await
274 .map_err(|_| Error::RefreshingAccessTokenFailed)?;
275 client_builder = client_builder.credentials(smtp_config.credentials().await?);
276 let client = build_tcp_client(&client_builder).await?;
277 Ok((client_builder, client))
278 }
279 Err(err) => Err(err),
280 }
281 }
282 #[cfg(feature = "oauth2")]
283 (SmtpAuthConfig::OAuth2(oauth2_config), true) => {
284 match Ok(build_tls_client(&client_builder).await?) {
285 Ok(client) => Ok((client_builder, client)),
286 Err(Error::ConnectTlsSmtpError(mail_send::Error::AuthenticationFailed(_))) => {
287 warn!("authentication failed, refreshing access token and retrying…");
288 oauth2_config
289 .refresh_access_token()
290 .await
291 .map_err(|_| Error::RefreshingAccessTokenFailed)?;
292 client_builder = client_builder.credentials(smtp_config.credentials().await?);
293 let client = build_tls_client(&client_builder).await?;
294 Ok((client_builder, client))
295 }
296 Err(err) => Err(err),
297 }
298 }
299 }
300}
301
302pub async fn build_tcp_client(
303 client_builder: &mail_send::SmtpClientBuilder<String>,
304) -> Result<SmtpClientStream> {
305 match client_builder.connect_plain().await {
306 Ok(client) => Ok(SmtpClientStream::Tcp(client)),
307 Err(err) => Err(Error::ConnectTcpSmtpError(err)),
308 }
309}
310
311pub async fn build_tls_client(
312 client_builder: &mail_send::SmtpClientBuilder<String>,
313) -> Result<SmtpClientStream> {
314 match client_builder.connect().await {
315 Ok(client) => Ok(SmtpClientStream::Tls(client)),
316 Err(err) => Err(Error::ConnectTlsSmtpError(err)),
317 }
318}
319
320fn into_smtp_msg(msg: Message<'_>) -> Result<SmtpMessage<'_>> {
326 let mut mail_from = None;
327 let mut rcpt_to = HashSet::new();
328
329 for header in msg.headers() {
330 let key = &header.name;
331 let val = header.value();
332
333 match key {
334 HeaderName::From => match val {
335 HeaderValue::Address(Address::List(addrs)) => {
336 if let Some(email) = addrs.first().and_then(find_valid_email) {
337 mail_from = email.to_string().into();
338 }
339 }
340 HeaderValue::Address(Address::Group(groups)) => {
341 if let Some(group) = groups.first() {
342 if let Some(email) = group.addresses.first().and_then(find_valid_email) {
343 mail_from = email.to_string().into();
344 }
345 }
346 }
347 _ => (),
348 },
349 HeaderName::To | HeaderName::Cc | HeaderName::Bcc => match val {
350 HeaderValue::Address(Address::List(addrs)) => {
351 rcpt_to.extend(addrs.iter().filter_map(find_valid_email));
352 }
353 HeaderValue::Address(Address::Group(groups)) => {
354 rcpt_to.extend(
355 groups
356 .iter()
357 .flat_map(|group| group.addresses.iter())
358 .filter_map(find_valid_email),
359 );
360 }
361 _ => (),
362 },
363 _ => (),
364 };
365 }
366
367 if rcpt_to.is_empty() {
368 return Err(Error::SendMessageMissingRecipientError);
369 }
370
371 let msg = SmtpMessage {
372 mail_from: mail_from
373 .ok_or(Error::SendMessageMissingSenderError)?
374 .into(),
375 rcpt_to: rcpt_to
376 .into_iter()
377 .map(|email| SmtpAddress {
378 email: email.into(),
379 ..Default::default()
380 })
381 .collect(),
382 body: msg.raw_message,
383 };
384
385 Ok(msg)
386}
387
388fn find_valid_email(addr: &Addr) -> Option<String> {
389 match &addr.address {
390 None => None,
391 Some(email) => {
392 let email = email.trim();
393 if email.is_empty() {
394 None
395 } else {
396 Some(email.to_string())
397 }
398 }
399 }
400}