1use std::{
2 cmp, collections::BTreeMap, fmt, future::Future, io, net::IpAddr, ops::Range, pin::Pin,
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use bitflags::bitflags;
8use chrono::Utc;
9use futures::{pin_mut, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10use rand::prelude::SliceRandom;
11use smol::net::TcpStream;
12use tracing::trace;
13use trust_dns_resolver::{
14 error::{ResolveError, ResolveErrorKind},
15 proto::error::ProtoError,
16 AsyncResolver, IntoName,
17};
18
19use smtp_message::{
20 nom, Command, Email, EnhancedReplyCodeSubject, Hostname, Parameters, Reply, ReplyCodeKind,
21};
22
23const SMTP_PORT: u16 = 25;
24
25const RDBUF_SIZE: usize = 16 * 1024;
26const DATABUF_SIZE: usize = 16 * 1024;
27const MINIMUM_FREE_BUFSPACE: usize = 128;
28
29const ZERO_DURATION: std::time::Duration = std::time::Duration::from_secs(0);
30
31pub type DynAsyncReadWrite =
32 duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>;
33
34#[derive(Eq, Hash, PartialEq)]
35pub struct Destination {
36 host: Hostname,
37}
38
39impl fmt::Display for Destination {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 self.host.fmt(f)
42 }
43}
44
45#[async_trait]
46pub trait Config {
47 fn ehlo_hostname(&self) -> Hostname<String>;
48
49 fn can_do_tls(&self) -> bool {
50 true
51 }
52
53 fn must_do_tls(&self) -> bool {
55 false
56 }
57
58 async fn tls_connect<IO>(&self, io: IO) -> io::Result<DynAsyncReadWrite>
60 where
61 IO: 'static + Unpin + Send + AsyncRead + AsyncWrite;
62
63 fn banner_read_timeout(&self) -> chrono::Duration {
64 chrono::Duration::minutes(5)
65 }
66
67 fn command_write_timeout(&self) -> chrono::Duration {
68 chrono::Duration::minutes(5)
69 }
70
71 fn ehlo_reply_timeout(&self) -> chrono::Duration {
72 chrono::Duration::minutes(5)
73 }
74
75 fn starttls_reply_timeout(&self) -> chrono::Duration {
76 chrono::Duration::minutes(2)
77 }
78
79 fn mail_reply_timeout(&self) -> chrono::Duration {
80 chrono::Duration::minutes(5)
81 }
82
83 fn rcpt_reply_timeout(&self) -> chrono::Duration {
84 chrono::Duration::minutes(5)
85 }
86
87 fn data_init_reply_timeout(&self) -> chrono::Duration {
88 chrono::Duration::minutes(2)
89 }
90
91 fn data_block_write_timeout(&self) -> chrono::Duration {
92 chrono::Duration::minutes(3)
93 }
94
95 fn data_end_reply_timeout(&self) -> chrono::Duration {
96 chrono::Duration::minutes(10)
97 }
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum TransportError {
102 #[error("Retrieving MX DNS records for ‘{0}’")]
103 DnsMx(String, #[source] ResolveError),
104
105 #[error("Converting hostname ‘{0}’ to to-be-resolved name")]
106 HostToTrustDns(String, #[source] ProtoError),
107
108 #[error("Retrieving IP DNS records for ‘{1}’")]
109 DnsIp(trust_dns_resolver::Name, #[source] ResolveError),
110
111 #[error("Connecting to ‘{0}’ port ‘{1}’")]
112 Connecting(IpAddr, u16, #[source] io::Error),
113
114 #[error("Receiving reply bytes")]
115 ReceivingReplyBytes(#[source] io::Error),
116
117 #[error("Timed out while waiting for a reply")]
118 TimedOutWaitingForReply,
119
120 #[error("Connection aborted")]
121 ConnectionAborted,
122
123 #[error("Reply does not fit in buffer: ‘{0}’")]
124 TooLongReply(String),
125
126 #[error("Syntax error parsing as a reply: ‘{0}’")]
127 SyntaxError(String),
128
129 #[error("Timed out while sending a command")]
130 TimedOutSendingCommand,
131
132 #[error("Sending command")]
133 SendingCommand(#[source] io::Error),
134
135 #[error("Negotiating TLS")]
136 NegotiatingTls(#[source] io::Error),
137
138 #[error("Cannot do TLS with remote server")]
139 CannotDoTls,
140
141 #[error("Mail-level transient issue: {0}")]
143 TransientMail(Reply),
144
145 #[error("Mailbox-level transient issue: {0}")]
146 TransientMailbox(Reply),
147
148 #[error("Mail system-level transient issue: {0}")]
149 TransientMailSystem(Reply),
150
151 #[error("Mail-level permanent issue: {0}")]
152 PermanentMail(Reply),
153
154 #[error("Mailbox-level permanent issue: {0}")]
155 PermanentMailbox(Reply),
156
157 #[error("Mail system-level permanent issue: {0}")]
158 PermanentMailSystem(Reply),
159
160 #[error("Unexpected reply code: {0}")]
161 UnexpectedReplyCode(Reply),
162
163 #[error("Timed out while sending data")]
164 TimedOutSendingData,
165
166 #[error("Sending data")]
167 SendingData(#[source] io::Error),
168
169 #[error("Reading the mail from the provided reader")]
170 ReadingMail(#[source] io::Error),
171}
172
173pub enum TransportErrorSeverity {
174 Local,
175 NetworkTransient,
176 MailTransient,
177 MailboxTransient,
178 MailSystemTransient,
179 MailPermanent,
180 MailboxPermanent,
181 MailSystemPermanent,
182}
183
184impl TransportError {
185 pub fn severity(&self) -> TransportErrorSeverity {
186 match self {
190 TransportError::DnsMx(_, _) => TransportErrorSeverity::NetworkTransient,
191 TransportError::HostToTrustDns(_, _) => TransportErrorSeverity::Local,
192 TransportError::DnsIp(_, _) => TransportErrorSeverity::NetworkTransient,
193 TransportError::Connecting(_, _, _) => TransportErrorSeverity::NetworkTransient,
194 TransportError::ReceivingReplyBytes(_) => TransportErrorSeverity::NetworkTransient,
195 TransportError::TimedOutWaitingForReply => TransportErrorSeverity::NetworkTransient,
196 TransportError::ConnectionAborted => TransportErrorSeverity::NetworkTransient,
197 TransportError::TooLongReply(_) => TransportErrorSeverity::NetworkTransient,
198 TransportError::SyntaxError(_) => TransportErrorSeverity::MailSystemTransient,
199 TransportError::TimedOutSendingCommand => TransportErrorSeverity::NetworkTransient,
200 TransportError::SendingCommand(_) => TransportErrorSeverity::NetworkTransient,
201 TransportError::NegotiatingTls(_) => TransportErrorSeverity::NetworkTransient, TransportError::CannotDoTls => TransportErrorSeverity::NetworkTransient, TransportError::TransientMail(_) => TransportErrorSeverity::MailTransient,
204 TransportError::TransientMailbox(_) => TransportErrorSeverity::MailboxTransient,
205 TransportError::TransientMailSystem(_) => TransportErrorSeverity::MailSystemTransient,
206 TransportError::PermanentMail(_) => TransportErrorSeverity::MailPermanent,
207 TransportError::PermanentMailbox(_) => TransportErrorSeverity::MailboxPermanent,
208 TransportError::PermanentMailSystem(_) => TransportErrorSeverity::MailSystemPermanent,
209 TransportError::UnexpectedReplyCode(_) => TransportErrorSeverity::NetworkTransient,
210 TransportError::TimedOutSendingData => TransportErrorSeverity::NetworkTransient,
211 TransportError::SendingData(_) => TransportErrorSeverity::NetworkTransient,
212 TransportError::ReadingMail(_) => TransportErrorSeverity::Local,
213 }
214 }
215}
216
217async fn read_for_reply<T>(
218 fut: impl Future<Output = io::Result<T>>,
219 waiting_for_reply_since: &chrono::DateTime<Utc>,
220 timeout: chrono::Duration,
221) -> Result<T, TransportError> {
222 smol::future::or(
223 async { fut.await.map_err(TransportError::ReceivingReplyBytes) },
224 async {
225 let max_delay: std::time::Duration = (*waiting_for_reply_since + timeout - Utc::now())
228 .to_std()
229 .unwrap_or(ZERO_DURATION);
230 smol::Timer::after(max_delay).await;
231 Err(TransportError::TimedOutWaitingForReply)
232 },
233 )
234 .await
235}
236
237async fn read_reply<IO>(
238 io: &mut IO,
239 rdbuf: &mut [u8; RDBUF_SIZE],
240 unhandled: &mut Range<usize>,
241 timeout: chrono::Duration,
242) -> Result<Reply, TransportError>
243where
244 IO: Unpin + Send + AsyncRead + AsyncWrite,
245{
246 let start = Utc::now();
247 if (*unhandled).is_empty() {
249 *unhandled = 0..read_for_reply(io.read(rdbuf), &start, timeout).await?;
250 if (*unhandled).is_empty() {
251 return Err(TransportError::ConnectionAborted);
252 }
253 }
254 loop {
255 trace!(
256 buf = String::from_utf8_lossy(&rdbuf[unhandled.clone()]).as_ref(),
257 "Trying to parse from buffer"
258 );
259 match Reply::<&str>::parse(&rdbuf[unhandled.clone()]) {
260 Err(nom::Err::Incomplete(n)) => {
261 if unhandled.start != 0 {
263 let missing = match n {
265 nom::Needed::Unknown => MINIMUM_FREE_BUFSPACE,
266 nom::Needed::Size(s) => cmp::max(MINIMUM_FREE_BUFSPACE, s.into()),
267 };
268 if missing > rdbuf.len() - unhandled.end {
269 rdbuf.copy_within(unhandled.clone(), 0);
270 unhandled.end = unhandled.len();
271 unhandled.start = 0;
272 }
273 }
274 if unhandled.end == rdbuf.len() {
275 return Err(TransportError::TooLongReply(
281 String::from_utf8_lossy(&rdbuf[unhandled.clone()]).to_string(),
282 ));
283 } else {
284 let read =
285 read_for_reply(io.read(&mut rdbuf[unhandled.end..]), &start, timeout)
286 .await?;
287 if read == 0 {
288 return Err(TransportError::ConnectionAborted);
289 }
290 unhandled.end += read;
291 }
292 }
293 Err(_) => {
294 return Err(TransportError::SyntaxError(
297 String::from_utf8_lossy(&rdbuf[unhandled.clone()]).to_string(),
298 ));
299 }
300 Ok((rem, reply)) => {
301 unhandled.start = unhandled.end - rem.len();
303 return Ok(reply.into_owned());
307 }
308 }
309 }
310}
311
312fn verify_reply(r: Reply, expected: ReplyCodeKind) -> Result<(), TransportError> {
313 use EnhancedReplyCodeSubject::*;
314 use ReplyCodeKind::*;
315 use TransportError::*;
316 match (r.code.kind(), r.ecode.as_ref().map(|e| e.subject())) {
317 (k, _) if k == expected => Ok(()),
318 (TransientNegative, Some(Mailbox)) => Err(TransientMailbox(r)),
319 (PermanentNegative, Some(Mailbox)) => Err(PermanentMailbox(r)),
320 (TransientNegative, Some(MailSystem)) => Err(TransientMailSystem(r)),
321 (PermanentNegative, Some(MailSystem)) => Err(PermanentMailSystem(r)),
322 (TransientNegative, _) => Err(TransientMail(r)),
323 (PermanentNegative, _) => Err(PermanentMail(r)),
324 (_, _) => Err(UnexpectedReplyCode(r)),
325 }
326}
327
328async fn send_command<IO>(
329 io: &mut IO,
330 cmd: Command<&str>,
331 timeout: chrono::Duration,
332) -> Result<(), TransportError>
333where
334 IO: Unpin + Send + AsyncRead + AsyncWrite,
335{
336 trace!(
337 cmd = String::from_utf8_lossy(&{
338 let mut v = Vec::new();
339 for s in cmd.as_io_slices() {
340 v.extend_from_slice(&*s);
341 }
342 v
343 })
344 .as_ref(),
345 "Sending command"
346 );
347 smol::future::or(
348 async {
349 io.write_all_vectored(&mut cmd.as_io_slices().collect::<Vec<_>>())
350 .await
351 .map_err(TransportError::SendingCommand)?;
352 Ok(())
353 },
354 async {
355 smol::Timer::after(timeout.to_std().unwrap_or(ZERO_DURATION)).await;
356 Err(TransportError::TimedOutSendingCommand)
357 },
358 )
359 .await
360}
361
362pub struct Client<C, P, Cfg>
363where
364 C: trust_dns_resolver::proto::DnsHandle<Error = trust_dns_resolver::error::ResolveError>,
365 P: trust_dns_resolver::ConnectionProvider<Conn = C>,
366 Cfg: Config,
367{
368 resolver: AsyncResolver<C, P>,
369 cfg: Arc<Cfg>,
370}
371
372impl<C, P, Cfg> Client<C, P, Cfg>
373where
374 C: trust_dns_resolver::proto::DnsHandle<Error = trust_dns_resolver::error::ResolveError>,
375 P: trust_dns_resolver::ConnectionProvider<Conn = C>,
376 Cfg: Config,
377{
378 pub fn new(resolver: AsyncResolver<C, P>, cfg: Arc<Cfg>) -> Client<C, P, Cfg> {
384 Client { resolver, cfg }
385 }
386
387 pub async fn get_destination(&self, host: &Hostname) -> Result<Destination, TransportError> {
388 Ok(Destination { host: host.clone() })
391 }
392
393 pub async fn connect(&self, dest: &Destination) -> Result<Sender<Cfg>, TransportError> {
394 match dest.host {
395 Hostname::Ipv4 { ip, .. } => self.connect_to_ip(IpAddr::V4(ip), SMTP_PORT).await,
396 Hostname::Ipv6 { ip, .. } => self.connect_to_ip(IpAddr::V6(ip), SMTP_PORT).await,
397 Hostname::AsciiDomain { ref raw } => self.connect_to_mx(raw).await,
398 Hostname::Utf8Domain { ref punycode, .. } => self.connect_to_mx(punycode).await,
399 }
400 }
401
402 pub async fn connect_to_mx(&self, host: &str) -> Result<Sender<Cfg>, TransportError> {
403 let lookup = self.resolver.mx_lookup(host).await;
407 let lookup = match lookup {
408 Ok(l) => l,
409 Err(e) => {
410 if let ResolveErrorKind::NoRecordsFound { .. } = e.kind() {
411 return self
413 .connect_to_host(
414 host.into_name()
415 .map_err(|e| TransportError::HostToTrustDns(host.to_owned(), e))?,
416 SMTP_PORT,
417 )
418 .await;
419 } else {
420 return Err(TransportError::DnsMx(host.to_owned(), e));
421 }
422 }
423 };
424
425 let mut mx_records = BTreeMap::new();
427 for record in lookup.iter() {
428 mx_records
429 .entry(record.preference())
430 .or_insert_with(|| Vec::with_capacity(1))
431 .push(record.exchange());
432 }
433
434 if mx_records.is_empty() {
436 return self
439 .connect_to_host(
440 host.into_name()
441 .map_err(|e| TransportError::HostToTrustDns(host.to_owned(), e))?,
442 SMTP_PORT,
443 )
444 .await;
445 }
446
447 let mut first_error = None;
451 for (_, mut mxes) in mx_records {
452 mxes.shuffle(&mut rand::thread_rng());
455
456 for mx in mxes {
461 match self.connect_to_host(mx.clone(), SMTP_PORT).await {
462 Ok(sender) => return Ok(sender),
463 Err(e) => first_error = first_error.or(Some(e)),
464 }
465 }
466 }
467
468 Err(first_error.unwrap())
477 }
478
479 async fn connect_to_host(
480 &self,
481 name: trust_dns_resolver::Name,
482 port: u16,
483 ) -> Result<Sender<Cfg>, TransportError> {
484 let lookup = self
486 .resolver
487 .lookup_ip(name.clone())
488 .await
489 .map_err(|e| TransportError::DnsIp(name, e))?;
490
491 let mut first_error = None;
495 for ip in lookup.iter() {
496 match self.connect_to_ip(ip, port).await {
497 Ok(sender) => return Ok(sender),
498 Err(e) => first_error = first_error.or(Some(e)),
499 }
500 }
501
502 Err(first_error.unwrap())
504 }
505
506 pub async fn connect_to_ip(
507 &self,
508 ip: IpAddr,
509 port: u16,
510 ) -> Result<Sender<Cfg>, TransportError> {
511 trace!("Connecting to ip {}:{}", ip, port);
513 let io = TcpStream::connect((ip, port))
516 .await
517 .map_err(|e| TransportError::Connecting(ip, port, e))?;
518 let (reader, writer) = io.split();
519 self.connect_to_stream(duplexify::Duplex::new(Box::pin(reader), Box::pin(writer)))
520 .await
521 }
522
523 pub async fn connect_to_stream(
526 &self,
527 io: DynAsyncReadWrite,
528 ) -> Result<Sender<Cfg>, TransportError> {
529 let mut sender = Sender {
530 io,
531 rdbuf: [0; RDBUF_SIZE],
532 unhandled: 0..0,
533 extensions: Extensions::empty(),
534 cfg: self.cfg.clone(),
535 };
536 let reply = read_reply(
541 &mut sender.io,
542 &mut sender.rdbuf,
543 &mut sender.unhandled,
544 self.cfg.banner_read_timeout(),
545 )
546 .await?;
547 verify_reply(reply, ReplyCodeKind::PositiveCompletion)?;
548
549 self.send_ehlo(&mut sender).await?;
553
554 let mut did_tls = false;
556 if sender.extensions.contains(Extensions::STARTTLS) && self.cfg.can_do_tls() {
557 send_command(
559 &mut sender.io,
560 Command::Starttls,
561 self.cfg.command_write_timeout(),
562 )
563 .await?;
564 let reply = read_reply(
565 &mut sender.io,
566 &mut sender.rdbuf,
567 &mut sender.unhandled,
568 self.cfg.starttls_reply_timeout(),
569 )
570 .await?;
571 if let Ok(()) = verify_reply(reply, ReplyCodeKind::PositiveCompletion) {
572 sender.io = self
575 .cfg
576 .tls_connect(sender.io)
577 .await
578 .map_err(TransportError::NegotiatingTls)?;
579 self.send_ehlo(&mut sender).await?;
592 did_tls = true;
593 } else {
594 }
599 }
600 if !did_tls && self.cfg.must_do_tls() {
601 return Err(TransportError::CannotDoTls);
602 }
603
604 Ok(sender)
607 }
608
609 async fn send_ehlo(&self, sender: &mut Sender<Cfg>) -> Result<(), TransportError> {
610 send_command(
611 &mut sender.io,
612 Command::Ehlo {
613 hostname: self.cfg.ehlo_hostname().to_ref(),
614 },
615 self.cfg.command_write_timeout(),
616 )
617 .await?;
618
619 let reply = read_reply(
621 &mut sender.io,
622 &mut sender.rdbuf,
623 &mut sender.unhandled,
624 self.cfg.ehlo_reply_timeout(),
625 )
626 .await?;
627 sender.extensions = Extensions::empty();
628 for line in reply.text.iter() {
629 if line.as_str().eq_ignore_ascii_case("STARTTLS") {
631 sender.extensions.insert(Extensions::STARTTLS);
632 }
633 }
634 verify_reply(reply, ReplyCodeKind::PositiveCompletion)?;
635
636 Ok(())
637 }
638}
639
640bitflags! {
641 struct Extensions: u8 {
642 const STARTTLS = 0b1;
643 }
644}
645
646pub struct Sender<Cfg> {
647 io: DynAsyncReadWrite,
648 rdbuf: [u8; RDBUF_SIZE],
649 unhandled: Range<usize>,
650 extensions: Extensions,
651 cfg: Arc<Cfg>,
652}
653
654impl<Cfg> Sender<Cfg>
655where
656 Cfg: Config,
657{
658 pub async fn send<Reader>(
665 &mut self,
666 from: Option<&Email>,
667 to: &Email,
668 mail: Reader,
669 ) -> Result<(), TransportError>
670 where
671 Reader: AsyncRead,
672 {
673 macro_rules! send_command {
674 ($cmd:expr) => {
675 send_command(&mut self.io, $cmd, self.cfg.command_write_timeout())
676 };
677 }
678 macro_rules! read_reply {
679 ($expected:expr, $timeout:expr) => {
680 async {
681 let reply =
682 read_reply(&mut self.io, &mut self.rdbuf, &mut self.unhandled, $timeout)
683 .await?;
684 verify_reply(reply, $expected)
685 }
686 };
687 }
688
689 send_command!(Command::Mail {
691 path: None,
692 email: from.map(|f| f.to_ref()),
693 params: Parameters(Vec::new()),
694 })
695 .await?;
696 read_reply!(
697 ReplyCodeKind::PositiveCompletion,
698 self.cfg.mail_reply_timeout()
699 )
700 .await?;
701
702 send_command!(Command::Rcpt {
704 path: None,
705 email: to.to_ref(),
706 params: Parameters(Vec::new()),
707 })
708 .await?;
709 read_reply!(
710 ReplyCodeKind::PositiveCompletion,
711 self.cfg.rcpt_reply_timeout()
712 )
713 .await?;
714
715 send_command!(Command::Data).await?;
717 read_reply!(
718 ReplyCodeKind::PositiveIntermediate,
719 self.cfg.data_init_reply_timeout()
720 )
721 .await?;
722
723 {
725 pin_mut!(mail);
726 let cfg = self.cfg.clone();
727 let mut databuf = [0; DATABUF_SIZE];
728 loop {
729 match mail.read(&mut databuf).await {
730 Ok(0) => {
731 break;
733 }
734 Ok(n) => {
735 smol::future::or(
737 async {
738 self.io
739 .write_all(&databuf[..n])
740 .await
741 .map_err(TransportError::SendingData)
742 },
743 async {
744 smol::Timer::after(
745 cfg.data_block_write_timeout()
746 .to_std()
747 .unwrap_or(ZERO_DURATION),
748 )
749 .await;
750 Err(TransportError::TimedOutSendingData)
751 },
752 )
753 .await?;
754 }
755 Err(e) => return Err(TransportError::ReadingMail(e)),
756 }
757 }
758 }
759
760 read_reply!(
762 ReplyCodeKind::PositiveCompletion,
763 self.cfg.data_end_reply_timeout()
764 )
765 .await?;
766
767 Ok(())
768 }
769}
770
771