smtp_client/
lib.rs

1use std::{
2    cmp, collections::BTreeMap, fmt, future::Future, io, net::IpAddr, ops::Range, pin::Pin,
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use bitflags::bitflags;
8use chrono::Utc;
9use futures::{pin_mut, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10use rand::prelude::SliceRandom;
11use smol::net::TcpStream;
12use tracing::trace;
13use trust_dns_resolver::{
14    error::{ResolveError, ResolveErrorKind},
15    proto::error::ProtoError,
16    AsyncResolver, IntoName,
17};
18
19use smtp_message::{
20    nom, Command, Email, EnhancedReplyCodeSubject, Hostname, Parameters, Reply, ReplyCodeKind,
21};
22
23const SMTP_PORT: u16 = 25;
24
25const RDBUF_SIZE: usize = 16 * 1024;
26const DATABUF_SIZE: usize = 16 * 1024;
27const MINIMUM_FREE_BUFSPACE: usize = 128;
28
29const ZERO_DURATION: std::time::Duration = std::time::Duration::from_secs(0);
30
31pub type DynAsyncReadWrite =
32    duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>;
33
34#[derive(Eq, Hash, PartialEq)]
35pub struct Destination {
36    host: Hostname,
37}
38
39impl fmt::Display for Destination {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        self.host.fmt(f)
42    }
43}
44
45#[async_trait]
46pub trait Config {
47    fn ehlo_hostname(&self) -> Hostname<String>;
48
49    fn can_do_tls(&self) -> bool {
50        true
51    }
52
53    // TODO: make this parameterized on the destination
54    fn must_do_tls(&self) -> bool {
55        false
56    }
57
58    /// Note: If this function can only fail, make can_do_tls return false
59    async fn tls_connect<IO>(&self, io: IO) -> io::Result<DynAsyncReadWrite>
60    where
61        IO: 'static + Unpin + Send + AsyncRead + AsyncWrite;
62
63    fn banner_read_timeout(&self) -> chrono::Duration {
64        chrono::Duration::minutes(5)
65    }
66
67    fn command_write_timeout(&self) -> chrono::Duration {
68        chrono::Duration::minutes(5)
69    }
70
71    fn ehlo_reply_timeout(&self) -> chrono::Duration {
72        chrono::Duration::minutes(5)
73    }
74
75    fn starttls_reply_timeout(&self) -> chrono::Duration {
76        chrono::Duration::minutes(2)
77    }
78
79    fn mail_reply_timeout(&self) -> chrono::Duration {
80        chrono::Duration::minutes(5)
81    }
82
83    fn rcpt_reply_timeout(&self) -> chrono::Duration {
84        chrono::Duration::minutes(5)
85    }
86
87    fn data_init_reply_timeout(&self) -> chrono::Duration {
88        chrono::Duration::minutes(2)
89    }
90
91    fn data_block_write_timeout(&self) -> chrono::Duration {
92        chrono::Duration::minutes(3)
93    }
94
95    fn data_end_reply_timeout(&self) -> chrono::Duration {
96        chrono::Duration::minutes(10)
97    }
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum TransportError {
102    #[error("Retrieving MX DNS records for ‘{0}’")]
103    DnsMx(String, #[source] ResolveError),
104
105    #[error("Converting hostname ‘{0}’ to to-be-resolved name")]
106    HostToTrustDns(String, #[source] ProtoError),
107
108    #[error("Retrieving IP DNS records for ‘{1}’")]
109    DnsIp(trust_dns_resolver::Name, #[source] ResolveError),
110
111    #[error("Connecting to ‘{0}’ port ‘{1}’")]
112    Connecting(IpAddr, u16, #[source] io::Error),
113
114    #[error("Receiving reply bytes")]
115    ReceivingReplyBytes(#[source] io::Error),
116
117    #[error("Timed out while waiting for a reply")]
118    TimedOutWaitingForReply,
119
120    #[error("Connection aborted")]
121    ConnectionAborted,
122
123    #[error("Reply does not fit in buffer: ‘{0}’")]
124    TooLongReply(String),
125
126    #[error("Syntax error parsing as a reply: ‘{0}’")]
127    SyntaxError(String),
128
129    #[error("Timed out while sending a command")]
130    TimedOutSendingCommand,
131
132    #[error("Sending command")]
133    SendingCommand(#[source] io::Error),
134
135    #[error("Negotiating TLS")]
136    NegotiatingTls(#[source] io::Error),
137
138    #[error("Cannot do TLS with remote server")]
139    CannotDoTls,
140
141    // TODO: add the command as error context
142    #[error("Mail-level transient issue: {0}")]
143    TransientMail(Reply),
144
145    #[error("Mailbox-level transient issue: {0}")]
146    TransientMailbox(Reply),
147
148    #[error("Mail system-level transient issue: {0}")]
149    TransientMailSystem(Reply),
150
151    #[error("Mail-level permanent issue: {0}")]
152    PermanentMail(Reply),
153
154    #[error("Mailbox-level permanent issue: {0}")]
155    PermanentMailbox(Reply),
156
157    #[error("Mail system-level permanent issue: {0}")]
158    PermanentMailSystem(Reply),
159
160    #[error("Unexpected reply code: {0}")]
161    UnexpectedReplyCode(Reply),
162
163    #[error("Timed out while sending data")]
164    TimedOutSendingData,
165
166    #[error("Sending data")]
167    SendingData(#[source] io::Error),
168
169    #[error("Reading the mail from the provided reader")]
170    ReadingMail(#[source] io::Error),
171}
172
173pub enum TransportErrorSeverity {
174    Local,
175    NetworkTransient,
176    MailTransient,
177    MailboxTransient,
178    MailSystemTransient,
179    MailPermanent,
180    MailboxPermanent,
181    MailSystemPermanent,
182}
183
184impl TransportError {
185    pub fn severity(&self) -> TransportErrorSeverity {
186        // TODO: Re-run over all these failure modes and check that the kind assignment
187        // is correct. Maybe add categories like ProtocolPermanent for invalid
188        // hostnames, or LocalTransient for local errors like “too many sockets opened”?
189        match self {
190            TransportError::DnsMx(_, _) => TransportErrorSeverity::NetworkTransient,
191            TransportError::HostToTrustDns(_, _) => TransportErrorSeverity::Local,
192            TransportError::DnsIp(_, _) => TransportErrorSeverity::NetworkTransient,
193            TransportError::Connecting(_, _, _) => TransportErrorSeverity::NetworkTransient,
194            TransportError::ReceivingReplyBytes(_) => TransportErrorSeverity::NetworkTransient,
195            TransportError::TimedOutWaitingForReply => TransportErrorSeverity::NetworkTransient,
196            TransportError::ConnectionAborted => TransportErrorSeverity::NetworkTransient,
197            TransportError::TooLongReply(_) => TransportErrorSeverity::NetworkTransient,
198            TransportError::SyntaxError(_) => TransportErrorSeverity::MailSystemTransient,
199            TransportError::TimedOutSendingCommand => TransportErrorSeverity::NetworkTransient,
200            TransportError::SendingCommand(_) => TransportErrorSeverity::NetworkTransient,
201            TransportError::NegotiatingTls(_) => TransportErrorSeverity::NetworkTransient, /* TODO: MailSystemPermanent? */
202            TransportError::CannotDoTls => TransportErrorSeverity::NetworkTransient, /* TODO: MailSystemPermanent? */
203            TransportError::TransientMail(_) => TransportErrorSeverity::MailTransient,
204            TransportError::TransientMailbox(_) => TransportErrorSeverity::MailboxTransient,
205            TransportError::TransientMailSystem(_) => TransportErrorSeverity::MailSystemTransient,
206            TransportError::PermanentMail(_) => TransportErrorSeverity::MailPermanent,
207            TransportError::PermanentMailbox(_) => TransportErrorSeverity::MailboxPermanent,
208            TransportError::PermanentMailSystem(_) => TransportErrorSeverity::MailSystemPermanent,
209            TransportError::UnexpectedReplyCode(_) => TransportErrorSeverity::NetworkTransient,
210            TransportError::TimedOutSendingData => TransportErrorSeverity::NetworkTransient,
211            TransportError::SendingData(_) => TransportErrorSeverity::NetworkTransient,
212            TransportError::ReadingMail(_) => TransportErrorSeverity::Local,
213        }
214    }
215}
216
217async fn read_for_reply<T>(
218    fut: impl Future<Output = io::Result<T>>,
219    waiting_for_reply_since: &chrono::DateTime<Utc>,
220    timeout: chrono::Duration,
221) -> Result<T, TransportError> {
222    smol::future::or(
223        async { fut.await.map_err(TransportError::ReceivingReplyBytes) },
224        async {
225            // TODO: this should be smol::Timer::at, but we would need to convert from
226            // Chrono::DateTime<Utc> to std::time::Instant and I can't find how right now
227            let max_delay: std::time::Duration = (*waiting_for_reply_since + timeout - Utc::now())
228                .to_std()
229                .unwrap_or(ZERO_DURATION);
230            smol::Timer::after(max_delay).await;
231            Err(TransportError::TimedOutWaitingForReply)
232        },
233    )
234    .await
235}
236
237async fn read_reply<IO>(
238    io: &mut IO,
239    rdbuf: &mut [u8; RDBUF_SIZE],
240    unhandled: &mut Range<usize>,
241    timeout: chrono::Duration,
242) -> Result<Reply, TransportError>
243where
244    IO: Unpin + Send + AsyncRead + AsyncWrite,
245{
246    let start = Utc::now();
247    // TODO: try to think of unifying this logic with the one in smtp-server?
248    if (*unhandled).is_empty() {
249        *unhandled = 0..read_for_reply(io.read(rdbuf), &start, timeout).await?;
250        if (*unhandled).is_empty() {
251            return Err(TransportError::ConnectionAborted);
252        }
253    }
254    loop {
255        trace!(
256            buf = String::from_utf8_lossy(&rdbuf[unhandled.clone()]).as_ref(),
257            "Trying to parse from buffer"
258        );
259        match Reply::<&str>::parse(&rdbuf[unhandled.clone()]) {
260            Err(nom::Err::Incomplete(n)) => {
261                // Don't have enough data to handle command, let's fetch more
262                if unhandled.start != 0 {
263                    // Do we have to copy the data to the beginning of the buffer?
264                    let missing = match n {
265                        nom::Needed::Unknown => MINIMUM_FREE_BUFSPACE,
266                        nom::Needed::Size(s) => cmp::max(MINIMUM_FREE_BUFSPACE, s.into()),
267                    };
268                    if missing > rdbuf.len() - unhandled.end {
269                        rdbuf.copy_within(unhandled.clone(), 0);
270                        unhandled.end = unhandled.len();
271                        unhandled.start = 0;
272                    }
273                }
274                if unhandled.end == rdbuf.len() {
275                    // If we reach here, it means that unhandled is already
276                    // basically the full buffer. Which means that we have to
277                    // error out that the reply is too big.
278                    // TODO: maybe there's something intelligent to be done here, like parsing reply
279                    // line per reply line?
280                    return Err(TransportError::TooLongReply(
281                        String::from_utf8_lossy(&rdbuf[unhandled.clone()]).to_string(),
282                    ));
283                } else {
284                    let read =
285                        read_for_reply(io.read(&mut rdbuf[unhandled.end..]), &start, timeout)
286                            .await?;
287                    if read == 0 {
288                        return Err(TransportError::ConnectionAborted);
289                    }
290                    unhandled.end += read;
291                }
292            }
293            Err(_) => {
294                // Syntax error
295                // TODO: maybe we can recover better than this?
296                return Err(TransportError::SyntaxError(
297                    String::from_utf8_lossy(&rdbuf[unhandled.clone()]).to_string(),
298                ));
299            }
300            Ok((rem, reply)) => {
301                // Got a reply
302                unhandled.start = unhandled.end - rem.len();
303                // TODO: when polonius is ready, we can remove this allocation by returning a
304                // borrow of the input buffer (with NLL it conflicts with the mutable borrow of
305                // rdbuf in the other match arm)
306                return Ok(reply.into_owned());
307            }
308        }
309    }
310}
311
312fn verify_reply(r: Reply, expected: ReplyCodeKind) -> Result<(), TransportError> {
313    use EnhancedReplyCodeSubject::*;
314    use ReplyCodeKind::*;
315    use TransportError::*;
316    match (r.code.kind(), r.ecode.as_ref().map(|e| e.subject())) {
317        (k, _) if k == expected => Ok(()),
318        (TransientNegative, Some(Mailbox)) => Err(TransientMailbox(r)),
319        (PermanentNegative, Some(Mailbox)) => Err(PermanentMailbox(r)),
320        (TransientNegative, Some(MailSystem)) => Err(TransientMailSystem(r)),
321        (PermanentNegative, Some(MailSystem)) => Err(PermanentMailSystem(r)),
322        (TransientNegative, _) => Err(TransientMail(r)),
323        (PermanentNegative, _) => Err(PermanentMail(r)),
324        (_, _) => Err(UnexpectedReplyCode(r)),
325    }
326}
327
328async fn send_command<IO>(
329    io: &mut IO,
330    cmd: Command<&str>,
331    timeout: chrono::Duration,
332) -> Result<(), TransportError>
333where
334    IO: Unpin + Send + AsyncRead + AsyncWrite,
335{
336    trace!(
337        cmd = String::from_utf8_lossy(&{
338            let mut v = Vec::new();
339            for s in cmd.as_io_slices() {
340                v.extend_from_slice(&*s);
341            }
342            v
343        })
344        .as_ref(),
345        "Sending command"
346    );
347    smol::future::or(
348        async {
349            io.write_all_vectored(&mut cmd.as_io_slices().collect::<Vec<_>>())
350                .await
351                .map_err(TransportError::SendingCommand)?;
352            Ok(())
353        },
354        async {
355            smol::Timer::after(timeout.to_std().unwrap_or(ZERO_DURATION)).await;
356            Err(TransportError::TimedOutSendingCommand)
357        },
358    )
359    .await
360}
361
362pub struct Client<C, P, Cfg>
363where
364    C: trust_dns_resolver::proto::DnsHandle<Error = trust_dns_resolver::error::ResolveError>,
365    P: trust_dns_resolver::ConnectionProvider<Conn = C>,
366    Cfg: Config,
367{
368    resolver: AsyncResolver<C, P>,
369    cfg: Arc<Cfg>,
370}
371
372impl<C, P, Cfg> Client<C, P, Cfg>
373where
374    C: trust_dns_resolver::proto::DnsHandle<Error = trust_dns_resolver::error::ResolveError>,
375    P: trust_dns_resolver::ConnectionProvider<Conn = C>,
376    Cfg: Config,
377{
378    /// Note: Passing as `resolver` something that is configured with
379    /// `Ipv6andIpv4` may lead to unexpected behavior, as the client will
380    /// attempt to connect to both the Ipv6 and the Ipv4 address if whichever
381    /// comes first doesn't successfully connect. In particular, it means that
382    /// performance could be degraded.
383    pub fn new(resolver: AsyncResolver<C, P>, cfg: Arc<Cfg>) -> Client<C, P, Cfg> {
384        Client { resolver, cfg }
385    }
386
387    pub async fn get_destination(&self, host: &Hostname) -> Result<Destination, TransportError> {
388        // TODO: already resolve here, but that means having to handle DNS expiration
389        // down the road
390        Ok(Destination { host: host.clone() })
391    }
392
393    pub async fn connect(&self, dest: &Destination) -> Result<Sender<Cfg>, TransportError> {
394        match dest.host {
395            Hostname::Ipv4 { ip, .. } => self.connect_to_ip(IpAddr::V4(ip), SMTP_PORT).await,
396            Hostname::Ipv6 { ip, .. } => self.connect_to_ip(IpAddr::V6(ip), SMTP_PORT).await,
397            Hostname::AsciiDomain { ref raw } => self.connect_to_mx(raw).await,
398            Hostname::Utf8Domain { ref punycode, .. } => self.connect_to_mx(punycode).await,
399        }
400    }
401
402    pub async fn connect_to_mx(&self, host: &str) -> Result<Sender<Cfg>, TransportError> {
403        // TODO: consider adding a `.` at the end of `host`... but is it
404        // actually allowed?
405        // Run MX lookup
406        let lookup = self.resolver.mx_lookup(host).await;
407        let lookup = match lookup {
408            Ok(l) => l,
409            Err(e) => {
410                if let ResolveErrorKind::NoRecordsFound { .. } = e.kind() {
411                    // If there are no MX records, try A/AAAA records
412                    return self
413                        .connect_to_host(
414                            host.into_name()
415                                .map_err(|e| TransportError::HostToTrustDns(host.to_owned(), e))?,
416                            SMTP_PORT,
417                        )
418                        .await;
419                } else {
420                    return Err(TransportError::DnsMx(host.to_owned(), e));
421                }
422            }
423        };
424
425        // Retrieve the actual records
426        let mut mx_records = BTreeMap::new();
427        for record in lookup.iter() {
428            mx_records
429                .entry(record.preference())
430                .or_insert_with(|| Vec::with_capacity(1))
431                .push(record.exchange());
432        }
433
434        // If there are no MX records, try A/AAAA records
435        if mx_records.is_empty() {
436            // TODO: is this actually required? trust_dns_resolver should return
437            // NoRecordsFound anyway
438            return self
439                .connect_to_host(
440                    host.into_name()
441                        .map_err(|e| TransportError::HostToTrustDns(host.to_owned(), e))?,
442                    SMTP_PORT,
443                )
444                .await;
445        }
446
447        // By increasing order of priority, try each MX
448        // TODO: definitely should not return the first error but the first least severe
449        // error
450        let mut first_error = None;
451        for (_, mut mxes) in mx_records {
452            // Among a single priority level, randomize the order
453            // TODO: consider giving a way to seed for reproducibility?
454            mxes.shuffle(&mut rand::thread_rng());
455
456            // Then try to connect to each address
457            // TODO: sometimes the DNS server already returns the IP alongside the MX record
458            // in the answer to the MX request, in which case we could directly
459            // connect_to_ip
460            for mx in mxes {
461                match self.connect_to_host(mx.clone(), SMTP_PORT).await {
462                    Ok(sender) => return Ok(sender),
463                    Err(e) => first_error = first_error.or(Some(e)),
464                }
465            }
466        }
467
468        // The below unwrap is safe because, to reach it:
469        // - there must be some MX records or we'd have returned in the if above
470        // - there have been no error as otherwise first_error wouldn't be None
471        // - there must have only be errors as otherwise we'd have returned in the match
472        //   above
473        // Hence, if it triggers it means that \exists N, N > 1 \wedge N = 0, where N is
474        // the number of errors.
475        //   QED.
476        Err(first_error.unwrap())
477    }
478
479    async fn connect_to_host(
480        &self,
481        name: trust_dns_resolver::Name,
482        port: u16,
483    ) -> Result<Sender<Cfg>, TransportError> {
484        // Lookup the IP addresses associated with this name
485        let lookup = self
486            .resolver
487            .lookup_ip(name.clone())
488            .await
489            .map_err(|e| TransportError::DnsIp(name, e))?;
490
491        // Following the order given by the DNS server, attempt connecting
492        // TODO: definitely should not return the first error but the first least severe
493        // error
494        let mut first_error = None;
495        for ip in lookup.iter() {
496            match self.connect_to_ip(ip, port).await {
497                Ok(sender) => return Ok(sender),
498                Err(e) => first_error = first_error.or(Some(e)),
499            }
500        }
501
502        // See comment on connect_to_mx above for why this unwrap is correct
503        Err(first_error.unwrap())
504    }
505
506    pub async fn connect_to_ip(
507        &self,
508        ip: IpAddr,
509        port: u16,
510    ) -> Result<Sender<Cfg>, TransportError> {
511        // TODO: introduce a connection uuid to associate log messages together
512        trace!("Connecting to ip {}:{}", ip, port);
513        // TODO: bind to specified outgoing IP address with net2 (first bind the builder
514        // to the outgoing IP, then connect)
515        let io = TcpStream::connect((ip, port))
516            .await
517            .map_err(|e| TransportError::Connecting(ip, port, e))?;
518        let (reader, writer) = io.split();
519        self.connect_to_stream(duplexify::Duplex::new(Box::pin(reader), Box::pin(writer)))
520            .await
521    }
522
523    // TODO: add a connect_to_{host,ip}_smtps
524
525    pub async fn connect_to_stream(
526        &self,
527        io: DynAsyncReadWrite,
528    ) -> Result<Sender<Cfg>, TransportError> {
529        let mut sender = Sender {
530            io,
531            rdbuf: [0; RDBUF_SIZE],
532            unhandled: 0..0,
533            extensions: Extensions::empty(),
534            cfg: self.cfg.clone(),
535        };
536        // TODO: Are there interesting things to do with replies apart from checking
537        // they're successful? Maybe logging them or something like that?
538
539        // Read the banner
540        let reply = read_reply(
541            &mut sender.io,
542            &mut sender.rdbuf,
543            &mut sender.unhandled,
544            self.cfg.banner_read_timeout(),
545        )
546        .await?;
547        verify_reply(reply, ReplyCodeKind::PositiveCompletion)?;
548
549        // Send EHLO
550        // TODO: fallback to HELO if EHLO fails (also record somewhere that this
551        // destination doesn't support HELO)
552        self.send_ehlo(&mut sender).await?;
553
554        // Send STARTTLS if possible
555        let mut did_tls = false;
556        if sender.extensions.contains(Extensions::STARTTLS) && self.cfg.can_do_tls() {
557            // Send STARTTLS and check the reply
558            send_command(
559                &mut sender.io,
560                Command::Starttls,
561                self.cfg.command_write_timeout(),
562            )
563            .await?;
564            let reply = read_reply(
565                &mut sender.io,
566                &mut sender.rdbuf,
567                &mut sender.unhandled,
568                self.cfg.starttls_reply_timeout(),
569            )
570            .await?;
571            if let Ok(()) = verify_reply(reply, ReplyCodeKind::PositiveCompletion) {
572                // TODO: pipelining is forbidden across starttls, check unhandled.empty()
573                // Negotiate STARTTLS
574                sender.io = self
575                    .cfg
576                    .tls_connect(sender.io)
577                    .await
578                    .map_err(TransportError::NegotiatingTls)?;
579                // TODO: in case this call fails, maybe log? also, if
580                // we have must_do_tls, this server should probably be
581                // removed from the retry list as no matching ciphers
582                // is probably a permanent error.
583                //
584                // TODO: Retry without TLS enabled! Currently servers that support starttls but
585                // only with ancient ciphers are unreachable
586                //
587                // TODO: Split out the error condition “network error” from “negotiation failed”
588                // so as to know whether we should try STARTTLS again next time
589
590                // Send EHLO again
591                self.send_ehlo(&mut sender).await?;
592                did_tls = true;
593            } else {
594                // Server failed to accept STARTTLS. Let's fall through and
595                // continue without it (unless must_do_tls is enabled)
596                // TODO: maybe log? also, if we have must_do_tls and this
597                // returns a permanent error we definitely should bounce
598            }
599        }
600        if !did_tls && self.cfg.must_do_tls() {
601            return Err(TransportError::CannotDoTls);
602        }
603
604        // TODO: AUTH
605
606        Ok(sender)
607    }
608
609    async fn send_ehlo(&self, sender: &mut Sender<Cfg>) -> Result<(), TransportError> {
610        send_command(
611            &mut sender.io,
612            Command::Ehlo {
613                hostname: self.cfg.ehlo_hostname().to_ref(),
614            },
615            self.cfg.command_write_timeout(),
616        )
617        .await?;
618
619        // Parse the reply and verify it
620        let reply = read_reply(
621            &mut sender.io,
622            &mut sender.rdbuf,
623            &mut sender.unhandled,
624            self.cfg.ehlo_reply_timeout(),
625        )
626        .await?;
627        sender.extensions = Extensions::empty();
628        for line in reply.text.iter() {
629            // TODO: parse other extensions that may be of interest (eg. pipelining)
630            if line.as_str().eq_ignore_ascii_case("STARTTLS") {
631                sender.extensions.insert(Extensions::STARTTLS);
632            }
633        }
634        verify_reply(reply, ReplyCodeKind::PositiveCompletion)?;
635
636        Ok(())
637    }
638}
639
640bitflags! {
641    struct Extensions: u8 {
642        const STARTTLS = 0b1;
643    }
644}
645
646pub struct Sender<Cfg> {
647    io: DynAsyncReadWrite,
648    rdbuf: [u8; RDBUF_SIZE],
649    unhandled: Range<usize>,
650    extensions: Extensions,
651    cfg: Arc<Cfg>,
652}
653
654impl<Cfg> Sender<Cfg>
655where
656    Cfg: Config,
657{
658    // TODO: Figure out a way to batch a single mail (with the same metadata) going
659    // out to multiple recipients, so as to just use multiple RCPT TO
660    /// Note: `mail` must be a reader of the *already escaped and
661    /// CRLF-dot-CRLF-terminated* message! If this is not the format
662    /// you have, please looking into the `smtp-message` crate's
663    /// utilities.
664    pub async fn send<Reader>(
665        &mut self,
666        from: Option<&Email>,
667        to: &Email,
668        mail: Reader,
669    ) -> Result<(), TransportError>
670    where
671        Reader: AsyncRead,
672    {
673        macro_rules! send_command {
674            ($cmd:expr) => {
675                send_command(&mut self.io, $cmd, self.cfg.command_write_timeout())
676            };
677        }
678        macro_rules! read_reply {
679            ($expected:expr, $timeout:expr) => {
680                async {
681                    let reply =
682                        read_reply(&mut self.io, &mut self.rdbuf, &mut self.unhandled, $timeout)
683                            .await?;
684                    verify_reply(reply, $expected)
685                }
686            };
687        }
688
689        // MAIL FROM
690        send_command!(Command::Mail {
691            path: None,
692            email: from.map(|f| f.to_ref()),
693            params: Parameters(Vec::new()),
694        })
695        .await?;
696        read_reply!(
697            ReplyCodeKind::PositiveCompletion,
698            self.cfg.mail_reply_timeout()
699        )
700        .await?;
701
702        // RCPT TO
703        send_command!(Command::Rcpt {
704            path: None,
705            email: to.to_ref(),
706            params: Parameters(Vec::new()),
707        })
708        .await?;
709        read_reply!(
710            ReplyCodeKind::PositiveCompletion,
711            self.cfg.rcpt_reply_timeout()
712        )
713        .await?;
714
715        // DATA
716        send_command!(Command::Data).await?;
717        read_reply!(
718            ReplyCodeKind::PositiveIntermediate,
719            self.cfg.data_init_reply_timeout()
720        )
721        .await?;
722
723        // Send the contents of the email
724        {
725            pin_mut!(mail);
726            let cfg = self.cfg.clone();
727            let mut databuf = [0; DATABUF_SIZE];
728            loop {
729                match mail.read(&mut databuf).await {
730                    Ok(0) => {
731                        // End of stream
732                        break;
733                    }
734                    Ok(n) => {
735                        // Got n bytes, try sending with a timeout
736                        smol::future::or(
737                            async {
738                                self.io
739                                    .write_all(&databuf[..n])
740                                    .await
741                                    .map_err(TransportError::SendingData)
742                            },
743                            async {
744                                smol::Timer::after(
745                                    cfg.data_block_write_timeout()
746                                        .to_std()
747                                        .unwrap_or(ZERO_DURATION),
748                                )
749                                .await;
750                                Err(TransportError::TimedOutSendingData)
751                            },
752                        )
753                        .await?;
754                    }
755                    Err(e) => return Err(TransportError::ReadingMail(e)),
756                }
757            }
758        }
759
760        // Wait for a reply
761        read_reply!(
762            ReplyCodeKind::PositiveCompletion,
763            self.cfg.data_end_reply_timeout()
764        )
765        .await?;
766
767        Ok(())
768    }
769}
770
771// TODO: is it important to call QUIT before closing the TCP stream?
772
773// TODO: add tests