smtp_server/
lib.rs

1#![cfg_attr(test, feature(negative_impls))]
2#![type_length_limit = "200000000"]
3
4pub mod protocol;
5
6use std::{cmp, io, ops::Range, pin::Pin, sync::Arc};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use futures::{
11    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
12    StreamExt,
13};
14use smol::future::FutureExt;
15use smtp_message::{
16    next_crlf, nom, Command, Email, EscapedDataReader, Hostname, MaybeUtf8, NextCrLfState, Reply,
17};
18
19pub use smtp_server_types::{reply, ConnectionMetadata, Decision, HelloInfo, MailMetadata};
20
21pub use protocol::{Protocol, ProtocolName};
22
23pub const RDBUF_SIZE: usize = 16 * 1024;
24const MINIMUM_FREE_BUFSPACE: usize = 128;
25
26#[async_trait]
27pub trait Config: Send + Sync {
28    type Protocol: for<'resp> Protocol<'resp>;
29
30    type ConnectionUserMeta: Send;
31    type MailUserMeta: Send;
32
33    /// Note: this function is only ever used for the default implementations of
34    /// other functions in this trait. As such, it is OK to leave it
35    /// `unimplemented!()` if other functions are implemented.
36    fn hostname(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str;
37
38    /// Note: this function is only ever used for the default implementations of
39    /// other functions in this trait. As such, it is OK to leave it
40    /// `unimplemented!()` if other functions are implemented.
41    #[allow(unused_variables)]
42    fn welcome_banner(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str {
43        "Service ready"
44    }
45
46    fn welcome_banner_reply(
47        &self,
48        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
49    ) -> Reply {
50        reply::welcome_banner(self.hostname(conn_meta), self.welcome_banner(conn_meta))
51    }
52
53    /// Note: this function is only ever used for the default implementations of
54    /// other functions in this trait. As such, it is OK to leave it
55    /// `unimplemented!()` if other functions are implemented.
56    #[allow(unused_variables)]
57    fn hello_banner(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str {
58        ""
59    }
60
61    #[allow(unused_variables)]
62    async fn filter_hello(
63        &self,
64        is_extended: bool,
65        hostname: Hostname,
66        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
67    ) -> Decision<HelloInfo> {
68        // Set `conn_meta.hello` early so that can_do_tls can use it below
69        conn_meta.hello = Some(HelloInfo {
70            is_extended,
71            hostname: hostname.clone(),
72        });
73        Decision::Accept {
74            reply: reply::okay_hello(
75                is_extended,
76                self.hostname(conn_meta),
77                self.hello_banner(conn_meta),
78                self.can_do_tls(conn_meta),
79            )
80            .convert(),
81            res: HelloInfo {
82                is_extended,
83                hostname,
84            },
85        }
86    }
87
88    #[allow(unused_variables)]
89    fn can_do_tls(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> bool {
90        !conn_meta.is_encrypted
91            && conn_meta
92                .hello
93                .as_ref()
94                .map(|h| h.is_extended)
95                .unwrap_or(false)
96    }
97
98    // TODO: when GATs are here, we can remove the trait object and return
99    // Self::TlsStream<IO> (or maybe we should refactor Config to be Config<IO>? but
100    // that's ugly). At that time we can probably get rid of all that duplexify
101    // mess... or maybe when we can do trait objects with more than one trait
102    /// Note: if you don't want to implement TLS, you should override
103    /// `can_do_tls` to return `false` so that STARTTLS is not advertized. This
104    /// being said, returning an error here should have the same result in
105    /// practice, except clients will try STARTTLS and fail
106    async fn tls_accept<IO>(
107        &self,
108        io: IO,
109        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
110    ) -> io::Result<
111        duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>,
112    >
113    where
114        IO: 'static + Unpin + Send + AsyncRead + AsyncWrite;
115
116    async fn new_mail(
117        &self,
118        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
119    ) -> Self::MailUserMeta;
120
121    async fn filter_from(
122        &self,
123        from: Option<Email>,
124        meta: &mut MailMetadata<Self::MailUserMeta>,
125        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
126    ) -> Decision<Option<Email>>;
127
128    async fn filter_to(
129        &self,
130        to: Email,
131        meta: &mut MailMetadata<Self::MailUserMeta>,
132        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
133    ) -> Decision<Email>;
134
135    #[allow(unused_variables)]
136    async fn filter_data(
137        &self,
138        meta: &mut MailMetadata<Self::MailUserMeta>,
139        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
140    ) -> Decision<()> {
141        Decision::Accept {
142            reply: reply::okay_data().convert(),
143            res: (),
144        }
145    }
146
147    /// `handle_mail` is an async function that returns either a single decision
148    /// in the case of the SMTP protocol, or an async stream of decisions in the
149    /// case of the LMTP protocol.
150    ///
151    /// For LMTP: there must be one such decision for each RCPT TO command that
152    /// succeeded (i.e. for each accepted recipient), which allows to indicate
153    /// that the message could be stored in the mailbox of some users, but not
154    /// in that of other users. This can happen for instance if their mail
155    /// quota is used up. The lifetimes of the borrow on the mail's data stream
156    /// is limited to the duration of the async call; this reference may not be
157    /// retained by the returned stream. The async function must consume the
158    /// entire data stream before returning its stream of responses. The
159    /// recommended implementation of this function would start by reading the
160    /// message's content to a temporary file, and then produce a stream that
161    /// writes the message to all of the mailboxes one after the other.
162    ///
163    /// Note: the EscapedDataReader has an inner buffer size of
164    /// [`RDBUF_SIZE`](RDBUF_SIZE), which means that reads should not happen
165    /// with more than this buffer size.
166    ///
167    /// Also, note that there is no timeout applied here, so the implementation
168    /// of this function is responsible for making sure that the client does not
169    /// just stop sending anything to DOS the system.
170    async fn handle_mail<'resp, R>(
171        &'resp self,
172        stream: &mut EscapedDataReader<'_, R>, // not borrowed for whole 'resp lifetime
173        meta: MailMetadata<Self::MailUserMeta>,
174        conn_meta: &'resp mut ConnectionMetadata<Self::ConnectionUserMeta>,
175    ) -> <Self::Protocol as Protocol<'resp>>::HandleMailReturnType
176    where
177        R: Send + Unpin + AsyncRead,
178        Self: 'resp;
179
180    #[allow(unused_variables)]
181    async fn handle_rset(
182        &self,
183        meta: &mut Option<MailMetadata<Self::MailUserMeta>>,
184        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
185    ) -> Decision<()> {
186        Decision::Accept {
187            reply: reply::okay_rset().convert(),
188            res: (),
189        }
190    }
191
192    #[allow(unused_variables)]
193    async fn handle_starttls(
194        &self,
195        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
196    ) -> Decision<()> {
197        if self.can_do_tls(conn_meta) {
198            Decision::Accept {
199                reply: reply::okay_starttls().convert(),
200                res: (),
201            }
202        } else {
203            Decision::Reject {
204                reply: reply::command_not_supported().convert(),
205            }
206        }
207    }
208
209    #[allow(unused_variables)]
210    async fn handle_expn(
211        &self,
212        name: MaybeUtf8<&str>,
213        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
214    ) -> Decision<()> {
215        Decision::Reject {
216            reply: reply::command_unimplemented().convert(),
217        }
218    }
219
220    #[allow(unused_variables)]
221    async fn handle_vrfy(
222        &self,
223        name: MaybeUtf8<&str>,
224        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
225    ) -> Decision<()> {
226        Decision::Accept {
227            reply: reply::ignore_vrfy().convert(),
228            res: (),
229        }
230    }
231
232    #[allow(unused_variables)]
233    async fn handle_help(
234        &self,
235        subject: MaybeUtf8<&str>,
236        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
237    ) -> Decision<()> {
238        Decision::Accept {
239            reply: reply::ignore_help().convert(),
240            res: (),
241        }
242    }
243
244    #[allow(unused_variables)]
245    async fn handle_noop(
246        &self,
247        string: MaybeUtf8<&str>,
248        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
249    ) -> Decision<()> {
250        Decision::Accept {
251            reply: reply::okay_noop().convert(),
252            res: (),
253        }
254    }
255
256    #[allow(unused_variables)]
257    async fn handle_quit(
258        &self,
259        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
260    ) -> Decision<()> {
261        Decision::Kill {
262            reply: Some(reply::okay_quit().convert()),
263            res: Ok(()),
264        }
265    }
266
267    #[allow(unused_variables)]
268    fn already_did_hello(
269        &self,
270        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
271    ) -> Reply {
272        reply::bad_sequence().convert()
273    }
274
275    #[allow(unused_variables)]
276    fn mail_before_hello(
277        &self,
278        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
279    ) -> Reply {
280        reply::bad_sequence().convert()
281    }
282
283    #[allow(unused_variables)]
284    fn already_in_mail(
285        &self,
286        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
287    ) -> Reply {
288        reply::bad_sequence().convert()
289    }
290
291    #[allow(unused_variables)]
292    fn rcpt_before_mail(
293        &self,
294        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
295    ) -> Reply {
296        reply::bad_sequence().convert()
297    }
298
299    #[allow(unused_variables)]
300    fn data_before_rcpt(
301        &self,
302        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
303    ) -> Reply {
304        reply::bad_sequence().convert()
305    }
306
307    #[allow(unused_variables)]
308    fn data_before_mail(
309        &self,
310        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
311    ) -> Reply {
312        reply::bad_sequence().convert()
313    }
314
315    #[allow(unused_variables)]
316    fn starttls_unsupported(
317        &self,
318        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
319    ) -> Reply {
320        reply::command_not_supported().convert()
321    }
322
323    #[allow(unused_variables)]
324    fn command_unrecognized(
325        &self,
326        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
327    ) -> Reply {
328        reply::command_unrecognized().convert()
329    }
330
331    #[allow(unused_variables)]
332    fn pipeline_forbidden_after_starttls(
333        &self,
334        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
335    ) -> Reply {
336        reply::pipeline_forbidden_after_starttls().convert()
337    }
338
339    #[allow(unused_variables)]
340    fn line_too_long(&self, conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>) -> Reply {
341        reply::line_too_long().convert()
342    }
343
344    #[allow(unused_variables)]
345    fn handle_mail_did_not_call_complete(
346        &self,
347        conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
348    ) -> Reply {
349        reply::handle_mail_did_not_call_complete().convert()
350    }
351
352    fn reply_write_timeout(&self) -> chrono::Duration {
353        chrono::Duration::minutes(5)
354    }
355
356    fn command_read_timeout(&self) -> chrono::Duration {
357        chrono::Duration::minutes(5)
358    }
359}
360
361async fn advance_until_crlf<R>(
362    r: &mut R,
363    buf: &mut [u8],
364    unhandled: &mut Range<usize>,
365) -> io::Result<()>
366where
367    R: Unpin + AsyncRead,
368{
369    let mut state = NextCrLfState::Start;
370    loop {
371        if let Some(p) = next_crlf(&buf[unhandled.clone()], &mut state) {
372            unhandled.start += p + 1;
373            return Ok(());
374        } else {
375            let read = r.read(buf).await?;
376            if read == 0 {
377                return Err(io::Error::new(
378                    io::ErrorKind::ConnectionAborted,
379                    "connection shutdown while waiting for crlf after invalid command",
380                ));
381            }
382            *unhandled = 0..read;
383        }
384    }
385}
386
387#[derive(Clone, Copy, Eq, PartialEq)]
388pub enum IsAlreadyTls {
389    Yes,
390    No,
391}
392
393pub async fn interact<IO, Cfg>(
394    io: IO,
395    is_already_tls: IsAlreadyTls,
396    metadata: Cfg::ConnectionUserMeta,
397    cfg: Arc<Cfg>,
398) -> io::Result<()>
399where
400    IO: 'static + Send + AsyncRead + AsyncWrite,
401    Cfg: Config,
402{
403    let (io_r, io_w) = io.split();
404    let mut io = duplexify::Duplex::new(
405        Box::pin(io_r) as Pin<Box<dyn Send + AsyncRead>>,
406        Box::pin(io_w) as Pin<Box<dyn Send + AsyncWrite>>,
407    );
408
409    let rdbuf = &mut [0; RDBUF_SIZE];
410    let mut unhandled = 0..0;
411    // TODO: should have a wrslices: Vec<IoSlice> here, so that we don't allocate
412    // for each write, but it looks like the API for reusing a Vec's backing
413    // allocation isn't ready yet and IoSlice's lifetime is going to make this
414    // impossible. Maybe this would require writing a crate that allows such vec
415    // storage recycling, as there doesn't appear to be any on crates.io. Having
416    // the wrslices would allow us to avoid all the allocations at each
417    // .collect() (present in `send_reply()`)
418    let mut conn_meta = ConnectionMetadata {
419        user: metadata,
420        hello: None,
421        is_encrypted: is_already_tls == IsAlreadyTls::Yes,
422    };
423    let mut mail_meta = None;
424
425    let mut waiting_for_command_since = Utc::now();
426
427    macro_rules! read_for_command {
428        ($e:expr) => {
429            $e.or(async {
430                // TODO: this should be smol::Timer::at, but we would need to convert from
431                // Chrono::DateTime<Utc> to std::time::Instant and I can't find how right now
432                let max_delay: std::time::Duration =
433                    (waiting_for_command_since + cfg.command_read_timeout() - Utc::now())
434                        .to_std()
435                        .unwrap_or(std::time::Duration::from_secs(0));
436                smol::Timer::after(max_delay).await;
437                Err(io::Error::new(
438                    io::ErrorKind::TimedOut,
439                    "timed out waiting for a command",
440                ))
441            })
442        };
443    }
444
445    macro_rules! send_reply {
446        ($writer:expr, $reply:expr) => {
447            smol::future::or(
448                async {
449                    $writer
450                        .write_all_vectored(&mut $reply.as_io_slices().collect::<Vec<_>>())
451                        .await?;
452                    waiting_for_command_since = Utc::now();
453                    Ok(())
454                },
455                async {
456                    smol::Timer::after(
457                        cfg.reply_write_timeout()
458                            .to_std()
459                            .unwrap_or(std::time::Duration::from_secs(0)),
460                    )
461                    .await;
462                    Err(io::Error::new(
463                        io::ErrorKind::TimedOut,
464                        "timed out sending a reply",
465                    ))
466                },
467            )
468        };
469    }
470
471    macro_rules! dispatch_decision {
472        ($e:expr, Accept($reply:pat, $res:pat) => $accept:block) => {
473            dispatch_decision!($e,
474                Reject(reply) => {
475                    send_reply!(io, reply).await?
476                }
477                Accept($reply, $res) => $accept
478            )
479        };
480
481        (
482            $e:expr,
483            Reject($reply_r:pat) => $reject:block
484            Accept($reply_a:pat, $res_a:pat) => $accept:block
485        ) => {
486            match $e {
487                Decision::Accept { reply: $reply_a, res: $res_a } => $accept,
488                Decision::Reject { reply: $reply_r } => $reject,
489                Decision::Kill { reply, res } => {
490                    if let Some(r) = reply {
491                        send_reply!(io, r).await?;
492                    }
493                    return res;
494                }
495            }
496        };
497    }
498
499    macro_rules! simple_handler {
500        ($handler:expr) => {
501            dispatch_decision! {
502                $handler,
503                Accept(reply, ()) => {
504                    send_reply!(io, reply).await?;
505                }
506            }
507        };
508    }
509
510    send_reply!(io, cfg.welcome_banner_reply(&mut conn_meta)).await?;
511
512    loop {
513        if unhandled.is_empty() {
514            unhandled = 0..read_for_command!(io.read(rdbuf)).await?;
515            if unhandled.is_empty() {
516                return Ok(());
517            }
518        }
519
520        let cmd = match Command::<&str>::parse(&rdbuf[unhandled.clone()]) {
521            Err(nom::Err::Incomplete(n)) => {
522                // Don't have enough data to handle command, let's fetch more
523                if unhandled.start != 0 {
524                    // Do we have to copy the data to the beginning of the buffer?
525                    let missing = match n {
526                        nom::Needed::Unknown => MINIMUM_FREE_BUFSPACE,
527                        nom::Needed::Size(s) => cmp::max(MINIMUM_FREE_BUFSPACE, s.into()),
528                    };
529                    if missing > rdbuf.len() - unhandled.end {
530                        rdbuf.copy_within(unhandled.clone(), 0);
531                        unhandled.end = unhandled.len();
532                        unhandled.start = 0;
533                    }
534                }
535                if unhandled.end == rdbuf.len() {
536                    // If we reach here, it means that unhandled is already
537                    // basically the full buffer. Which means that we have to
538                    // error out that the line is too long.
539                    read_for_command!(advance_until_crlf(&mut io, rdbuf, &mut unhandled)).await?;
540                    send_reply!(io, cfg.line_too_long(&mut conn_meta)).await?;
541                } else {
542                    let read = read_for_command!(io.read(&mut rdbuf[unhandled.end..])).await?;
543                    if read == 0 {
544                        return Err(io::Error::new(
545                            io::ErrorKind::ConnectionAborted,
546                            "connection shutdown with partial command",
547                        ));
548                    }
549                    unhandled.end += read;
550                }
551                None
552            }
553            Err(_) => {
554                // Syntax error
555                read_for_command!(advance_until_crlf(&mut io, rdbuf, &mut unhandled)).await?;
556                send_reply!(io, cfg.command_unrecognized(&mut conn_meta)).await?;
557                None
558            }
559            Ok((rem, cmd)) => {
560                // Got a command
561                unhandled.start = unhandled.end - rem.len();
562                Some(cmd)
563            }
564        };
565
566        // This match is really just to avoid too much rightwards drift, otherwise it
567        // could have been included directly in the Ok((rem, cmd)) branch above.
568        // Unfortunately we can't make it a function, because `cmd` borrows `rdbuf`, and
569        // we need to use `rdbuf` in the `Command::Data` branch here
570        match cmd {
571            None => (),
572
573            Some(cmd @ (Command::Ehlo { .. } | Command::Helo { .. } | Command::Lhlo { .. })) => {
574                let (cmd_proto, is_extended, hostname) = match cmd {
575                    Command::Ehlo { hostname } => (ProtocolName::Smtp, true, hostname),
576                    Command::Helo { hostname } => (ProtocolName::Smtp, false, hostname),
577                    Command::Lhlo { hostname } => (ProtocolName::Lmtp, true, hostname),
578                    _ => unreachable!(),
579                };
580                if cmd_proto != <Cfg::Protocol as Protocol<'static>>::PROTOCOL {
581                    send_reply!(io, cfg.command_unrecognized(&mut conn_meta)).await?;
582                } else {
583                    match conn_meta.hello {
584                        Some(_) => {
585                            send_reply!(io, cfg.already_did_hello(&mut conn_meta)).await?;
586                        }
587                        None => dispatch_decision! {
588                            cfg.filter_hello(is_extended, hostname.into_owned(), &mut conn_meta)
589                                .await,
590                            Accept(reply, res) => {
591                                conn_meta.hello = Some(res);
592                                send_reply!(io, reply).await?;
593                            }
594                        },
595                    }
596                }
597            }
598
599            Some(Command::Mail {
600                path: _path,
601                email,
602                params: _params,
603            }) => {
604                if conn_meta.hello.is_none() {
605                    send_reply!(io, cfg.mail_before_hello(&mut conn_meta)).await?;
606                } else {
607                    match mail_meta {
608                        Some(_) => {
609                            // Both postfix and OpenSMTPD just return an error and ignore further
610                            // MAIL FROM when there is already a MAIL FROM running
611                            send_reply!(io, cfg.already_in_mail(&mut conn_meta)).await?;
612                        }
613                        None => {
614                            let mut mail_metadata = MailMetadata {
615                                user: cfg.new_mail(&mut conn_meta).await,
616                                from: None,
617                                to: Vec::with_capacity(4),
618                            };
619                            dispatch_decision! {
620                                cfg.filter_from(
621                                    email.as_ref().map(|e| e.clone().into_owned()),
622                                    &mut mail_metadata,
623                                    &mut conn_meta,
624                                )
625                                .await,
626                                Accept(reply, res) => {
627                                    mail_metadata.from = res;
628                                    mail_meta = Some(mail_metadata);
629                                    send_reply!(io, reply).await?;
630                                }
631                            }
632                        }
633                    }
634                }
635            }
636
637            Some(Command::Rcpt {
638                path: _path,
639                email,
640                params: _params,
641            }) => match mail_meta {
642                None => {
643                    send_reply!(io, cfg.rcpt_before_mail(&mut conn_meta)).await?;
644                }
645                Some(ref mut mail_meta_unw) => dispatch_decision! {
646                    cfg.filter_to(email.into_owned(), mail_meta_unw, &mut conn_meta).await,
647                    Accept(reply, res) => {
648                        mail_meta_unw.to.push(res);
649                        send_reply!(io, reply).await?;
650                    }
651                },
652            },
653
654            Some(Command::Data) => match mail_meta.take() {
655                None => {
656                    send_reply!(io, cfg.data_before_mail(&mut conn_meta)).await?;
657                }
658                Some(ref mail_meta_unw) if mail_meta_unw.to.is_empty() => {
659                    send_reply!(io, cfg.data_before_rcpt(&mut conn_meta)).await?;
660                }
661                Some(mut mail_meta_unw) => {
662                    dispatch_decision! {
663                        cfg.filter_data(&mut mail_meta_unw, &mut conn_meta).await,
664                        Reject(reply) => {
665                            mail_meta = Some(mail_meta_unw);
666                            send_reply!(io, reply).await?;
667                        }
668                        Accept(reply, ()) => {
669                            send_reply!(io, reply).await?;
670                            let mut reader =
671                                EscapedDataReader::new(rdbuf, unhandled.clone(), &mut io);
672                            let expected_n_decisions = match <Cfg::Protocol as Protocol<'static>>::PROTOCOL {
673                                ProtocolName::Smtp => 1,
674                                ProtocolName::Lmtp => mail_meta_unw.to.len(),
675                            };
676                            let mut decision_stream = <Cfg::Protocol as Protocol<'_>>::handle_mail_return_type_as_stream(cfg
677                                .handle_mail(&mut reader, mail_meta_unw, &mut conn_meta).await);
678                            // This variable is a trick because otherwise rustc thinks the `reader`
679                            // borrow is still alive across await points and makes `interact: !Send`
680                            let reader_was_completed = if let Some(u) = reader.get_unhandled() {
681                                unhandled = u;
682                                true
683                            } else {
684                                false
685                            };
686                            if reader_was_completed {
687                                // Other mail systems (at least
688                                // postfix, OpenSMTPD and gmail)
689                                // appear to drop the state on an
690                                // unsuccessful DATA command (eg. too
691                                // long, non-RFC5322-compliant, etc.).
692                                // Couldn't find the RFC reference
693                                // anywhere, though.
694                                let mut n_decisions = 0;
695                                while let Some(decision) = decision_stream.next().await {
696                                    n_decisions += 1;
697                                    if n_decisions > expected_n_decisions {
698                                        panic!("got more decisions in handle_mail return than the expected {}", expected_n_decisions);
699                                    }
700                                    simple_handler!(decision);
701                                }
702                                assert_eq!(n_decisions, expected_n_decisions, "got {} decisions in handle_mail return, expected {}", n_decisions, expected_n_decisions);
703                            } else {
704                                // handle_mail did not call complete, let's read until the end and
705                                // then return an error
706                                // TODO: 128 is probably too small?
707                                let ignore_buf = &mut [0u8; 128];
708                                // TODO: consider whether it would make sense to have a separate
709                                // timeout here... giving as much time for sending the whole DATA
710                                // message may be a bit too little? but then it only happens when
711                                // handle_mail breaks anyway, so...
712                                while read_for_command!(reader.read(ignore_buf)).await? != 0 {}
713                                if !reader.is_finished() {
714                                    // Stream cut mid-connection
715                                    return Err(io::Error::new(
716                                        io::ErrorKind::ConnectionAborted,
717                                        "connection shutdown during email reception",
718                                    ));
719                                }
720                                reader.complete();
721                                unhandled = reader.get_unhandled().unwrap();
722                                // TODO: rustc complains if we don't drop(decision_stream) here, why?
723                                drop(decision_stream);
724                                for _i in 0..expected_n_decisions {
725                                    send_reply!(io, cfg.handle_mail_did_not_call_complete(&mut conn_meta)).await?;
726                                }
727                            };
728                        }
729                    }
730                }
731            },
732
733            Some(Command::Rset) => dispatch_decision! {
734                cfg.handle_rset(&mut mail_meta, &mut conn_meta).await,
735                Accept(reply, ()) => {
736                    mail_meta = None;
737                    send_reply!(io, reply).await?;
738                }
739            },
740
741            Some(Command::Starttls) => {
742                if !cfg.can_do_tls(&conn_meta) {
743                    send_reply!(io, cfg.starttls_unsupported(&mut conn_meta)).await?;
744                } else if !unhandled.is_empty() {
745                    send_reply!(io, cfg.pipeline_forbidden_after_starttls(&mut conn_meta)).await?;
746                } else {
747                    dispatch_decision! {
748                        cfg.handle_starttls(&mut conn_meta).await,
749                        Accept(reply, ()) => {
750                            send_reply!(io, reply).await?;
751                            io = cfg.tls_accept(io, &mut conn_meta).await?;
752                            mail_meta = None;
753                            conn_meta.is_encrypted = true;
754                            conn_meta.hello = None;
755                        }
756                    }
757                }
758            }
759
760            Some(Command::Expn { name }) => {
761                simple_handler!(cfg.handle_expn(name, &mut conn_meta).await)
762            }
763            Some(Command::Vrfy { name }) => {
764                simple_handler!(cfg.handle_vrfy(name, &mut conn_meta).await)
765            }
766            Some(Command::Help { subject }) => {
767                simple_handler!(cfg.handle_help(subject, &mut conn_meta).await)
768            }
769            Some(Command::Noop { string }) => {
770                simple_handler!(cfg.handle_noop(string, &mut conn_meta).await)
771            }
772            Some(Command::Quit) => simple_handler!(cfg.handle_quit(&mut conn_meta).await),
773        }
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780
781    use std::{
782        self, str,
783        sync::{Arc, Mutex},
784    };
785
786    use async_trait::async_trait;
787    use duplexify::Duplex;
788    use futures::executor;
789
790    use smtp_message::ReplyCode;
791
792    /// Used as `println!("{:?}", show_bytes(b))`
793    pub fn show_bytes(b: &[u8]) -> String {
794        if b.len() > 512 {
795            format!("{{too long, size = {}}}", b.len())
796        } else if let Ok(s) = str::from_utf8(b) {
797            s.into()
798        } else {
799            format!("{:?}", b)
800        }
801    }
802
803    struct TestConfig {
804        mails: Arc<Mutex<Vec<(Option<Email>, Vec<Email>, Vec<u8>)>>>,
805    }
806
807    #[async_trait]
808    impl Config for TestConfig {
809        type ConnectionUserMeta = ();
810        type MailUserMeta = ();
811        type Protocol = protocol::Smtp;
812
813        fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str {
814            "test.example.org".into()
815        }
816
817        async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) {}
818
819        async fn tls_accept<IO>(
820            &self,
821            mut io: IO,
822            _conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
823        ) -> io::Result<
824            duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>,
825        >
826        where
827            IO: 'static + Unpin + Send + AsyncRead + AsyncWrite,
828        {
829            io.write_all(b"<tls server>").await?;
830            let mut buf = [0; 12];
831            io.read_exact(&mut buf).await?;
832            assert_eq!(
833                &buf,
834                b"<tls client>",
835                "got TLS handshake that is not <tls client>: {:?}",
836                show_bytes(&buf)
837            );
838            let (r, w) = io.split();
839            Ok(duplexify::Duplex::new(Box::pin(r), Box::pin(w)))
840        }
841
842        async fn filter_from(
843            &self,
844            addr: Option<Email>,
845            _meta: &mut MailMetadata<()>,
846            _conn_meta: &mut ConnectionMetadata<()>,
847        ) -> Decision<Option<Email>> {
848            // TODO: have a helper function for the Email::parse_until that just works(tm)
849            // for uses such as this one
850            if addr == Some(Email::parse_bracketed(b"<bad@quux.example.org>").unwrap()) {
851                Decision::Reject {
852                    reply: Reply {
853                        code: ReplyCode::POLICY_REASON,
854                        ecode: None,
855                        text: vec!["User 'bad' banned".into()],
856                    },
857                }
858            } else {
859                Decision::Accept {
860                    reply: reply::okay_from().convert(),
861                    res: addr,
862                }
863            }
864        }
865
866        async fn filter_to(
867            &self,
868            email: Email,
869            _meta: &mut MailMetadata<()>,
870            _conn_meta: &mut ConnectionMetadata<()>,
871        ) -> Decision<Email> {
872            if email.localpart.raw() == "baz" {
873                Decision::Reject {
874                    reply: Reply {
875                        code: ReplyCode::MAILBOX_UNAVAILABLE,
876                        ecode: None,
877                        text: vec!["No user 'baz'".into()],
878                    },
879                }
880            } else {
881                Decision::Accept {
882                    reply: reply::okay_to().convert(),
883                    res: email,
884                }
885            }
886        }
887
888        async fn handle_mail<'resp, R>(
889            &'resp self,
890            reader: &mut EscapedDataReader<'_, R>,
891            meta: MailMetadata<()>,
892            _conn_meta: &'resp mut ConnectionMetadata<()>,
893        ) -> Decision<()>
894        where
895            R: Send + Unpin + AsyncRead,
896        {
897            let mut mail_text = Vec::new();
898            let res = reader.read_to_end(&mut mail_text).await;
899            if !reader.is_finished() {
900                // Note: this is a stupid buggy implementation.
901                // But it allows us to test more code in
902                // interrupted_data.
903                return Decision::Accept {
904                    reply: reply::okay_mail().convert(),
905                    res: (),
906                };
907            }
908            reader.complete();
909            if res.is_err() {
910                Decision::Reject {
911                    reply: Reply {
912                        code: ReplyCode::BAD_SEQUENCE,
913                        ecode: None,
914                        text: vec!["Closed the channel before end of message".into()],
915                    },
916                }
917            } else if mail_text.windows(5).position(|x| x == b"World").is_some() {
918                Decision::Reject {
919                    reply: Reply {
920                        code: ReplyCode::POLICY_REASON,
921                        ecode: None,
922                        text: vec!["Don't you dare say 'World'!".into()],
923                    },
924                }
925            } else {
926                self.mails
927                    .lock()
928                    .expect("failed to load mutex")
929                    .push((meta.from, meta.to, mail_text));
930                Decision::Accept {
931                    reply: reply::okay_mail().convert(),
932                    res: (),
933                }
934            }
935        }
936    }
937
938    #[test]
939    fn interacts_ok() {
940        let tests: &[(&[&[u8]], &[u8], &[(Option<&[u8]>, &[&[u8]], &[u8])])] = &[
941            (
942                &[b"EHLO test\r\n\
943                    MAIL FROM:<>\r\n\
944                    RCPT TO:<baz@quux.example.org>\r\n\
945                    RCPT TO:<foo2@bar.example.org>\r\n\
946                    RCPT TO:<foo3@bar.example.org>\r\n\
947                    DATA\r\n\
948                    Hello world\r\n\
949                    .\r\n\
950                    QUIT\r\n"],
951                b"220 test.example.org Service ready\r\n\
952                  250-test.example.org\r\n\
953                  250-8BITMIME\r\n\
954                  250-ENHANCEDSTATUSCODES\r\n\
955                  250-PIPELINING\r\n\
956                  250-SMTPUTF8\r\n\
957                  250 STARTTLS\r\n\
958                  250 2.0.0 Okay\r\n\
959                  550 No user 'baz'\r\n\
960                  250 2.1.5 Okay\r\n\
961                  250 2.1.5 Okay\r\n\
962                  354 Start mail input; end with <CRLF>.<CRLF>\r\n\
963                  250 2.0.0 Okay\r\n\
964                  221 2.0.0 Bye\r\n",
965                &[(
966                    None,
967                    &[b"<foo2@bar.example.org>", b"<foo3@bar.example.org>"],
968                    b"Hello world\r\n.\r\n",
969                )],
970            ),
971            (
972                &[b"HELO test\r\n\
973                    MAIL FROM:<test@example.org>\r\n\
974                    RCPT TO:<foo@example.org>\r\n\
975                    DATA\r\n\
976                    Hello World\r\n\
977                    .\r\n\
978                    QUIT\r\n"],
979                b"220 test.example.org Service ready\r\n\
980                  250 test.example.org\r\n\
981                  250 2.0.0 Okay\r\n\
982                  250 2.1.5 Okay\r\n\
983                  354 Start mail input; end with <CRLF>.<CRLF>\r\n\
984                  550 Don't you dare say 'World'!\r\n\
985                  221 2.0.0 Bye\r\n",
986                &[],
987            ),
988            (
989                &[b"HELO test\r\n\
990                    MAIL FROM:<bad@quux.example.org>\r\n\
991                    MAIL FROM:<foo@bar.example.org>\r\n\
992                    MAIL FROM:<baz@quux.example.org>\r\n\
993                    RCPT TO:<foo2@bar.example.org>\r\n\
994                    DATA\r\n\
995                    Hello\r\n\
996                    .\r\n\
997                    QUIT\r\n"],
998                b"220 test.example.org Service ready\r\n\
999                  250 test.example.org\r\n\
1000                  550 User 'bad' banned\r\n\
1001                  250 2.0.0 Okay\r\n\
1002                  503 5.5.1 Bad sequence of commands\r\n\
1003                  250 2.1.5 Okay\r\n\
1004                  354 Start mail input; end with <CRLF>.<CRLF>\r\n\
1005                  250 2.0.0 Okay\r\n\
1006                  221 2.0.0 Bye\r\n",
1007                &[(
1008                    Some(b"<foo@bar.example.org>"),
1009                    &[b"<foo2@bar.example.org>"],
1010                    b"Hello\r\n.\r\n",
1011                )],
1012            ),
1013            (
1014                &[b"HELO test\r\n\
1015                    MAIL FROM:<foo@bar.example.org>\r\n\
1016                    RSET\r\n\
1017                    MAIL FROM:<baz@quux.example.org>\r\n\
1018                    RCPT TO:<foo2@bar.example.org>\r\n\
1019                    DATA\r\n\
1020                    Hello\r\n\
1021                    .\r\n\
1022                    QUIT\r\n"],
1023                b"220 test.example.org Service ready\r\n\
1024                  250 test.example.org\r\n\
1025                  250 2.0.0 Okay\r\n\
1026                  250 2.0.0 Okay\r\n\
1027                  250 2.0.0 Okay\r\n\
1028                  250 2.1.5 Okay\r\n\
1029                  354 Start mail input; end with <CRLF>.<CRLF>\r\n\
1030                  250 2.0.0 Okay\r\n\
1031                  221 2.0.0 Bye\r\n",
1032                &[(
1033                    Some(b"<baz@quux.example.org>"),
1034                    &[b"<foo2@bar.example.org>"],
1035                    b"Hello\r\n.\r\n",
1036                )],
1037            ),
1038            (
1039                &[b"HELO test\r\n\
1040                    MAIL FROM:<foo@test.example.com>\r\n\
1041                    DATA\r\n\
1042                    QUIT\r\n"],
1043                b"220 test.example.org Service ready\r\n\
1044                  250 test.example.org\r\n\
1045                  250 2.0.0 Okay\r\n\
1046                  503 5.5.1 Bad sequence of commands\r\n\
1047                  221 2.0.0 Bye\r\n",
1048                &[],
1049            ),
1050            (
1051                &[b"HELO test\r\n\
1052                    MAIL FROM:<foo@test.example.com>\r\n\
1053                    RCPT TO:<foo@bar.example.org>\r\n"],
1054                b"220 test.example.org Service ready\r\n\
1055                  250 test.example.org\r\n\
1056                  250 2.0.0 Okay\r\n\
1057                  250 2.1.5 Okay\r\n",
1058                &[],
1059            ),
1060            (
1061                &[b"HELO test\r\n\
1062                    MAIL FROM:<foo@test.example.com>\r\n\
1063                    THISISNOTACOMMAND\r\n\
1064                    RCPT TO:<foo@bar.example.org>\r\n"],
1065                b"220 test.example.org Service ready\r\n\
1066                  250 test.example.org\r\n\
1067                  250 2.0.0 Okay\r\n\
1068                  500 5.5.1 Command not recognized\r\n\
1069                  250 2.1.5 Okay\r\n",
1070                &[],
1071            ),
1072            (
1073                &[b"MAIL FROM:<foo@test.example.com>\r\n"],
1074                b"220 test.example.org Service ready\r\n\
1075                  503 5.5.1 Bad sequence of commands\r\n",
1076                &[],
1077            ),
1078            (
1079                &[b"HELO test\r\n\
1080                    EXPN foo\r\n\
1081                    VRFY bar\r\n\
1082                    HELP baz\r\n\
1083                    NOOP\r\n"],
1084                b"220 test.example.org Service ready\r\n\
1085                  250 test.example.org\r\n\
1086                  502 5.5.1 Command not implemented\r\n\
1087                  252 2.1.5 Cannot VRFY user, but will accept message and attempt delivery\r\n\
1088                  214 2.0.0 See https://tools.ietf.org/html/rfc5321\r\n\
1089                  250 2.0.0 Okay\r\n",
1090                &[],
1091            ),
1092            (
1093                &[b"HELO test\r\n\
1094                    EXPN foo\r\n\
1095                    QUIT\r\n\
1096                    HELP baz\r\n"],
1097                b"220 test.example.org Service ready\r\n\
1098                  250 test.example.org\r\n\
1099                  502 5.5.1 Command not implemented\r\n\
1100                  221 2.0.0 Bye\r\n",
1101                &[],
1102            ),
1103            (
1104                &[
1105                    b"EHLO test\r\n\
1106                      STARTTLS\r\n",
1107                    b"<tls client>",
1108                    b"EHLO test2\r\n",
1109                ],
1110                b"220 test.example.org Service ready\r\n\
1111                  250-test.example.org\r\n\
1112                  250-8BITMIME\r\n\
1113                  250-ENHANCEDSTATUSCODES\r\n\
1114                  250-PIPELINING\r\n\
1115                  250-SMTPUTF8\r\n\
1116                  250 STARTTLS\r\n\
1117                  220 2.0.0 Ready to start TLS\r\n\
1118                  <tls server>\
1119                  250-test.example.org\r\n\
1120                  250-8BITMIME\r\n\
1121                  250-ENHANCEDSTATUSCODES\r\n\
1122                  250-PIPELINING\r\n\
1123                  250 SMTPUTF8\r\n",
1124                &[],
1125            ),
1126        ];
1127        for &(inp, out, mail) in tests {
1128            println!(
1129                "\nSending: {:?}",
1130                inp.iter().map(|b| show_bytes(*b)).collect::<Vec<_>>()
1131            );
1132            let resp_mail = Arc::new(Mutex::new(Vec::new()));
1133            let cfg = Arc::new(TestConfig {
1134                mails: resp_mail.clone(),
1135            });
1136            let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1137            let (mut out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1138            let io = Duplex::new(inp_pipe_r, out_pipe_w);
1139            let ((), resp) = smol::block_on(futures::future::join(
1140                async move {
1141                    for i in inp {
1142                        // Yield 100 times to be sure the interact process had enough time to
1143                        // process the data
1144                        for _ in 0..100usize {
1145                            smol::future::yield_now().await;
1146                        }
1147                        inp_pipe_w
1148                            .write_all(i)
1149                            .await
1150                            .expect("writing to input pipe");
1151                    }
1152                },
1153                async move {
1154                    interact(io, IsAlreadyTls::No, (), cfg)
1155                        .await
1156                        .expect("calling interact");
1157                    let mut resp = Vec::new();
1158                    out_pipe_r
1159                        .read_to_end(&mut resp)
1160                        .await
1161                        .expect("reading from output pipe");
1162                    resp
1163                },
1164            ));
1165
1166            println!("Expecting: {:?}", show_bytes(out));
1167            println!("Got      : {:?}", show_bytes(&resp));
1168            assert_eq!(resp, out);
1169
1170            println!("Checking mails:");
1171            let resp_mail = Arc::try_unwrap(resp_mail).unwrap().into_inner().unwrap();
1172            assert_eq!(resp_mail.len(), mail.len());
1173            for ((fr, tr, cr), &(fo, to, co)) in resp_mail.into_iter().zip(mail) {
1174                println!("Mail\n---");
1175
1176                println!("From: expected {:?}, got {:?}", fo, fr);
1177                assert_eq!(fo.map(|e| Email::parse_bracketed(e).unwrap()), fr);
1178
1179                let to = to
1180                    .iter()
1181                    .map(|e| Email::parse_bracketed(e).unwrap())
1182                    .collect::<Vec<_>>();
1183                println!("To: expected {:?}, got {:?}", to, tr);
1184                assert_eq!(to, tr);
1185
1186                println!("Expected text: {:?}", show_bytes(co));
1187                println!("Got text     : {:?}", show_bytes(&cr));
1188                assert_eq!(co, &cr[..]);
1189            }
1190        }
1191    }
1192
1193    // Fuzzer-found
1194    #[test]
1195    fn interrupted_data() {
1196        let inp: &[u8] = b"MAIL FROM:foo\r\n\
1197                           RCPT TO:bar\r\n\
1198                           DATA\r\n\
1199                           hello";
1200        let cfg = Arc::new(TestConfig {
1201            mails: Arc::new(Mutex::new(Vec::new())),
1202        });
1203        let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1204        let (_out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1205        let io = Duplex::new(inp_pipe_r, out_pipe_w);
1206        let err_kind = executor::block_on(async move {
1207            inp_pipe_w
1208                .write_all(inp)
1209                .await
1210                .expect("writing to input pipe");
1211            std::mem::drop(inp_pipe_w);
1212            interact(io, IsAlreadyTls::No, (), cfg)
1213                .await
1214                .expect_err("calling interact")
1215                .kind()
1216        });
1217        assert_eq!(err_kind, io::ErrorKind::ConnectionAborted,);
1218    }
1219
1220    // Fuzzer-found
1221    #[test]
1222    fn no_stack_overflow() {
1223        let inp: &[u8] =
1224            b"\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1225              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1226              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1227              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1228              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1229              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1230              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1231              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1232              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1233              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1234              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1235              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1236              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1237              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1238              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1239              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1240              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1241              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1242              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1243              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1244              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1245              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1246              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1247              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1248              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1249              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1250              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1251              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1252              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1253              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1254              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1255              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1256              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1257              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1258              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1259              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1260              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1261              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1262              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1263              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1264              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1265              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1266              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1267              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1268              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1269              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1270              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1271              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1272              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1273              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1274              \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\r\n";
1275        let cfg = Arc::new(TestConfig {
1276            mails: Arc::new(Mutex::new(Vec::new())),
1277        });
1278        let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1279        let (_out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1280        let io = Duplex::new(inp_pipe_r, out_pipe_w);
1281        executor::block_on(async move {
1282            inp_pipe_w
1283                .write_all(inp)
1284                .await
1285                .expect("writing to input pipe");
1286            std::mem::drop(inp_pipe_w);
1287            interact(io, IsAlreadyTls::No, (), cfg)
1288                .await
1289                .expect("calling interact");
1290        });
1291    }
1292
1293    struct MinBoundsIo;
1294    impl !Sync for MinBoundsIo {}
1295    impl AsyncRead for MinBoundsIo {
1296        fn poll_read(
1297            self: std::pin::Pin<&mut Self>,
1298            _: &mut std::task::Context<'_>,
1299            _: &mut [u8],
1300        ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
1301            unimplemented!()
1302        }
1303    }
1304    impl AsyncWrite for MinBoundsIo {
1305        fn poll_write(
1306            self: std::pin::Pin<&mut Self>,
1307            _: &mut std::task::Context<'_>,
1308            _: &[u8],
1309        ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
1310            unimplemented!()
1311        }
1312
1313        fn poll_flush(
1314            self: std::pin::Pin<&mut Self>,
1315            _: &mut std::task::Context<'_>,
1316        ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
1317            unimplemented!()
1318        }
1319
1320        fn poll_close(
1321            self: std::pin::Pin<&mut Self>,
1322            _: &mut std::task::Context<'_>,
1323        ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
1324            unimplemented!()
1325        }
1326    }
1327
1328    fn assert_send<T: Send>(_: T) {}
1329
1330    #[test]
1331    fn interact_is_send() {
1332        let cfg = Arc::new(TestConfig {
1333            mails: Arc::new(Mutex::new(Vec::new())),
1334        });
1335        assert_send(interact(MinBoundsIo, IsAlreadyTls::No, (), cfg));
1336    }
1337}