1#![cfg_attr(test, feature(negative_impls))]
2#![type_length_limit = "200000000"]
3
4pub mod protocol;
5
6use std::{cmp, io, ops::Range, pin::Pin, sync::Arc};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use futures::{
11 io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
12 StreamExt,
13};
14use smol::future::FutureExt;
15use smtp_message::{
16 next_crlf, nom, Command, Email, EscapedDataReader, Hostname, MaybeUtf8, NextCrLfState, Reply,
17};
18
19pub use smtp_server_types::{reply, ConnectionMetadata, Decision, HelloInfo, MailMetadata};
20
21pub use protocol::{Protocol, ProtocolName};
22
23pub const RDBUF_SIZE: usize = 16 * 1024;
24const MINIMUM_FREE_BUFSPACE: usize = 128;
25
26#[async_trait]
27pub trait Config: Send + Sync {
28 type Protocol: for<'resp> Protocol<'resp>;
29
30 type ConnectionUserMeta: Send;
31 type MailUserMeta: Send;
32
33 fn hostname(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str;
37
38 #[allow(unused_variables)]
42 fn welcome_banner(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str {
43 "Service ready"
44 }
45
46 fn welcome_banner_reply(
47 &self,
48 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
49 ) -> Reply {
50 reply::welcome_banner(self.hostname(conn_meta), self.welcome_banner(conn_meta))
51 }
52
53 #[allow(unused_variables)]
57 fn hello_banner(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> &str {
58 ""
59 }
60
61 #[allow(unused_variables)]
62 async fn filter_hello(
63 &self,
64 is_extended: bool,
65 hostname: Hostname,
66 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
67 ) -> Decision<HelloInfo> {
68 conn_meta.hello = Some(HelloInfo {
70 is_extended,
71 hostname: hostname.clone(),
72 });
73 Decision::Accept {
74 reply: reply::okay_hello(
75 is_extended,
76 self.hostname(conn_meta),
77 self.hello_banner(conn_meta),
78 self.can_do_tls(conn_meta),
79 )
80 .convert(),
81 res: HelloInfo {
82 is_extended,
83 hostname,
84 },
85 }
86 }
87
88 #[allow(unused_variables)]
89 fn can_do_tls(&self, conn_meta: &ConnectionMetadata<Self::ConnectionUserMeta>) -> bool {
90 !conn_meta.is_encrypted
91 && conn_meta
92 .hello
93 .as_ref()
94 .map(|h| h.is_extended)
95 .unwrap_or(false)
96 }
97
98 async fn tls_accept<IO>(
107 &self,
108 io: IO,
109 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
110 ) -> io::Result<
111 duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>,
112 >
113 where
114 IO: 'static + Unpin + Send + AsyncRead + AsyncWrite;
115
116 async fn new_mail(
117 &self,
118 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
119 ) -> Self::MailUserMeta;
120
121 async fn filter_from(
122 &self,
123 from: Option<Email>,
124 meta: &mut MailMetadata<Self::MailUserMeta>,
125 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
126 ) -> Decision<Option<Email>>;
127
128 async fn filter_to(
129 &self,
130 to: Email,
131 meta: &mut MailMetadata<Self::MailUserMeta>,
132 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
133 ) -> Decision<Email>;
134
135 #[allow(unused_variables)]
136 async fn filter_data(
137 &self,
138 meta: &mut MailMetadata<Self::MailUserMeta>,
139 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
140 ) -> Decision<()> {
141 Decision::Accept {
142 reply: reply::okay_data().convert(),
143 res: (),
144 }
145 }
146
147 async fn handle_mail<'resp, R>(
171 &'resp self,
172 stream: &mut EscapedDataReader<'_, R>, meta: MailMetadata<Self::MailUserMeta>,
174 conn_meta: &'resp mut ConnectionMetadata<Self::ConnectionUserMeta>,
175 ) -> <Self::Protocol as Protocol<'resp>>::HandleMailReturnType
176 where
177 R: Send + Unpin + AsyncRead,
178 Self: 'resp;
179
180 #[allow(unused_variables)]
181 async fn handle_rset(
182 &self,
183 meta: &mut Option<MailMetadata<Self::MailUserMeta>>,
184 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
185 ) -> Decision<()> {
186 Decision::Accept {
187 reply: reply::okay_rset().convert(),
188 res: (),
189 }
190 }
191
192 #[allow(unused_variables)]
193 async fn handle_starttls(
194 &self,
195 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
196 ) -> Decision<()> {
197 if self.can_do_tls(conn_meta) {
198 Decision::Accept {
199 reply: reply::okay_starttls().convert(),
200 res: (),
201 }
202 } else {
203 Decision::Reject {
204 reply: reply::command_not_supported().convert(),
205 }
206 }
207 }
208
209 #[allow(unused_variables)]
210 async fn handle_expn(
211 &self,
212 name: MaybeUtf8<&str>,
213 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
214 ) -> Decision<()> {
215 Decision::Reject {
216 reply: reply::command_unimplemented().convert(),
217 }
218 }
219
220 #[allow(unused_variables)]
221 async fn handle_vrfy(
222 &self,
223 name: MaybeUtf8<&str>,
224 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
225 ) -> Decision<()> {
226 Decision::Accept {
227 reply: reply::ignore_vrfy().convert(),
228 res: (),
229 }
230 }
231
232 #[allow(unused_variables)]
233 async fn handle_help(
234 &self,
235 subject: MaybeUtf8<&str>,
236 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
237 ) -> Decision<()> {
238 Decision::Accept {
239 reply: reply::ignore_help().convert(),
240 res: (),
241 }
242 }
243
244 #[allow(unused_variables)]
245 async fn handle_noop(
246 &self,
247 string: MaybeUtf8<&str>,
248 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
249 ) -> Decision<()> {
250 Decision::Accept {
251 reply: reply::okay_noop().convert(),
252 res: (),
253 }
254 }
255
256 #[allow(unused_variables)]
257 async fn handle_quit(
258 &self,
259 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
260 ) -> Decision<()> {
261 Decision::Kill {
262 reply: Some(reply::okay_quit().convert()),
263 res: Ok(()),
264 }
265 }
266
267 #[allow(unused_variables)]
268 fn already_did_hello(
269 &self,
270 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
271 ) -> Reply {
272 reply::bad_sequence().convert()
273 }
274
275 #[allow(unused_variables)]
276 fn mail_before_hello(
277 &self,
278 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
279 ) -> Reply {
280 reply::bad_sequence().convert()
281 }
282
283 #[allow(unused_variables)]
284 fn already_in_mail(
285 &self,
286 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
287 ) -> Reply {
288 reply::bad_sequence().convert()
289 }
290
291 #[allow(unused_variables)]
292 fn rcpt_before_mail(
293 &self,
294 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
295 ) -> Reply {
296 reply::bad_sequence().convert()
297 }
298
299 #[allow(unused_variables)]
300 fn data_before_rcpt(
301 &self,
302 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
303 ) -> Reply {
304 reply::bad_sequence().convert()
305 }
306
307 #[allow(unused_variables)]
308 fn data_before_mail(
309 &self,
310 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
311 ) -> Reply {
312 reply::bad_sequence().convert()
313 }
314
315 #[allow(unused_variables)]
316 fn starttls_unsupported(
317 &self,
318 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
319 ) -> Reply {
320 reply::command_not_supported().convert()
321 }
322
323 #[allow(unused_variables)]
324 fn command_unrecognized(
325 &self,
326 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
327 ) -> Reply {
328 reply::command_unrecognized().convert()
329 }
330
331 #[allow(unused_variables)]
332 fn pipeline_forbidden_after_starttls(
333 &self,
334 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
335 ) -> Reply {
336 reply::pipeline_forbidden_after_starttls().convert()
337 }
338
339 #[allow(unused_variables)]
340 fn line_too_long(&self, conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>) -> Reply {
341 reply::line_too_long().convert()
342 }
343
344 #[allow(unused_variables)]
345 fn handle_mail_did_not_call_complete(
346 &self,
347 conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
348 ) -> Reply {
349 reply::handle_mail_did_not_call_complete().convert()
350 }
351
352 fn reply_write_timeout(&self) -> chrono::Duration {
353 chrono::Duration::minutes(5)
354 }
355
356 fn command_read_timeout(&self) -> chrono::Duration {
357 chrono::Duration::minutes(5)
358 }
359}
360
361async fn advance_until_crlf<R>(
362 r: &mut R,
363 buf: &mut [u8],
364 unhandled: &mut Range<usize>,
365) -> io::Result<()>
366where
367 R: Unpin + AsyncRead,
368{
369 let mut state = NextCrLfState::Start;
370 loop {
371 if let Some(p) = next_crlf(&buf[unhandled.clone()], &mut state) {
372 unhandled.start += p + 1;
373 return Ok(());
374 } else {
375 let read = r.read(buf).await?;
376 if read == 0 {
377 return Err(io::Error::new(
378 io::ErrorKind::ConnectionAborted,
379 "connection shutdown while waiting for crlf after invalid command",
380 ));
381 }
382 *unhandled = 0..read;
383 }
384 }
385}
386
387#[derive(Clone, Copy, Eq, PartialEq)]
388pub enum IsAlreadyTls {
389 Yes,
390 No,
391}
392
393pub async fn interact<IO, Cfg>(
394 io: IO,
395 is_already_tls: IsAlreadyTls,
396 metadata: Cfg::ConnectionUserMeta,
397 cfg: Arc<Cfg>,
398) -> io::Result<()>
399where
400 IO: 'static + Send + AsyncRead + AsyncWrite,
401 Cfg: Config,
402{
403 let (io_r, io_w) = io.split();
404 let mut io = duplexify::Duplex::new(
405 Box::pin(io_r) as Pin<Box<dyn Send + AsyncRead>>,
406 Box::pin(io_w) as Pin<Box<dyn Send + AsyncWrite>>,
407 );
408
409 let rdbuf = &mut [0; RDBUF_SIZE];
410 let mut unhandled = 0..0;
411 let mut conn_meta = ConnectionMetadata {
419 user: metadata,
420 hello: None,
421 is_encrypted: is_already_tls == IsAlreadyTls::Yes,
422 };
423 let mut mail_meta = None;
424
425 let mut waiting_for_command_since = Utc::now();
426
427 macro_rules! read_for_command {
428 ($e:expr) => {
429 $e.or(async {
430 let max_delay: std::time::Duration =
433 (waiting_for_command_since + cfg.command_read_timeout() - Utc::now())
434 .to_std()
435 .unwrap_or(std::time::Duration::from_secs(0));
436 smol::Timer::after(max_delay).await;
437 Err(io::Error::new(
438 io::ErrorKind::TimedOut,
439 "timed out waiting for a command",
440 ))
441 })
442 };
443 }
444
445 macro_rules! send_reply {
446 ($writer:expr, $reply:expr) => {
447 smol::future::or(
448 async {
449 $writer
450 .write_all_vectored(&mut $reply.as_io_slices().collect::<Vec<_>>())
451 .await?;
452 waiting_for_command_since = Utc::now();
453 Ok(())
454 },
455 async {
456 smol::Timer::after(
457 cfg.reply_write_timeout()
458 .to_std()
459 .unwrap_or(std::time::Duration::from_secs(0)),
460 )
461 .await;
462 Err(io::Error::new(
463 io::ErrorKind::TimedOut,
464 "timed out sending a reply",
465 ))
466 },
467 )
468 };
469 }
470
471 macro_rules! dispatch_decision {
472 ($e:expr, Accept($reply:pat, $res:pat) => $accept:block) => {
473 dispatch_decision!($e,
474 Reject(reply) => {
475 send_reply!(io, reply).await?
476 }
477 Accept($reply, $res) => $accept
478 )
479 };
480
481 (
482 $e:expr,
483 Reject($reply_r:pat) => $reject:block
484 Accept($reply_a:pat, $res_a:pat) => $accept:block
485 ) => {
486 match $e {
487 Decision::Accept { reply: $reply_a, res: $res_a } => $accept,
488 Decision::Reject { reply: $reply_r } => $reject,
489 Decision::Kill { reply, res } => {
490 if let Some(r) = reply {
491 send_reply!(io, r).await?;
492 }
493 return res;
494 }
495 }
496 };
497 }
498
499 macro_rules! simple_handler {
500 ($handler:expr) => {
501 dispatch_decision! {
502 $handler,
503 Accept(reply, ()) => {
504 send_reply!(io, reply).await?;
505 }
506 }
507 };
508 }
509
510 send_reply!(io, cfg.welcome_banner_reply(&mut conn_meta)).await?;
511
512 loop {
513 if unhandled.is_empty() {
514 unhandled = 0..read_for_command!(io.read(rdbuf)).await?;
515 if unhandled.is_empty() {
516 return Ok(());
517 }
518 }
519
520 let cmd = match Command::<&str>::parse(&rdbuf[unhandled.clone()]) {
521 Err(nom::Err::Incomplete(n)) => {
522 if unhandled.start != 0 {
524 let missing = match n {
526 nom::Needed::Unknown => MINIMUM_FREE_BUFSPACE,
527 nom::Needed::Size(s) => cmp::max(MINIMUM_FREE_BUFSPACE, s.into()),
528 };
529 if missing > rdbuf.len() - unhandled.end {
530 rdbuf.copy_within(unhandled.clone(), 0);
531 unhandled.end = unhandled.len();
532 unhandled.start = 0;
533 }
534 }
535 if unhandled.end == rdbuf.len() {
536 read_for_command!(advance_until_crlf(&mut io, rdbuf, &mut unhandled)).await?;
540 send_reply!(io, cfg.line_too_long(&mut conn_meta)).await?;
541 } else {
542 let read = read_for_command!(io.read(&mut rdbuf[unhandled.end..])).await?;
543 if read == 0 {
544 return Err(io::Error::new(
545 io::ErrorKind::ConnectionAborted,
546 "connection shutdown with partial command",
547 ));
548 }
549 unhandled.end += read;
550 }
551 None
552 }
553 Err(_) => {
554 read_for_command!(advance_until_crlf(&mut io, rdbuf, &mut unhandled)).await?;
556 send_reply!(io, cfg.command_unrecognized(&mut conn_meta)).await?;
557 None
558 }
559 Ok((rem, cmd)) => {
560 unhandled.start = unhandled.end - rem.len();
562 Some(cmd)
563 }
564 };
565
566 match cmd {
571 None => (),
572
573 Some(cmd @ (Command::Ehlo { .. } | Command::Helo { .. } | Command::Lhlo { .. })) => {
574 let (cmd_proto, is_extended, hostname) = match cmd {
575 Command::Ehlo { hostname } => (ProtocolName::Smtp, true, hostname),
576 Command::Helo { hostname } => (ProtocolName::Smtp, false, hostname),
577 Command::Lhlo { hostname } => (ProtocolName::Lmtp, true, hostname),
578 _ => unreachable!(),
579 };
580 if cmd_proto != <Cfg::Protocol as Protocol<'static>>::PROTOCOL {
581 send_reply!(io, cfg.command_unrecognized(&mut conn_meta)).await?;
582 } else {
583 match conn_meta.hello {
584 Some(_) => {
585 send_reply!(io, cfg.already_did_hello(&mut conn_meta)).await?;
586 }
587 None => dispatch_decision! {
588 cfg.filter_hello(is_extended, hostname.into_owned(), &mut conn_meta)
589 .await,
590 Accept(reply, res) => {
591 conn_meta.hello = Some(res);
592 send_reply!(io, reply).await?;
593 }
594 },
595 }
596 }
597 }
598
599 Some(Command::Mail {
600 path: _path,
601 email,
602 params: _params,
603 }) => {
604 if conn_meta.hello.is_none() {
605 send_reply!(io, cfg.mail_before_hello(&mut conn_meta)).await?;
606 } else {
607 match mail_meta {
608 Some(_) => {
609 send_reply!(io, cfg.already_in_mail(&mut conn_meta)).await?;
612 }
613 None => {
614 let mut mail_metadata = MailMetadata {
615 user: cfg.new_mail(&mut conn_meta).await,
616 from: None,
617 to: Vec::with_capacity(4),
618 };
619 dispatch_decision! {
620 cfg.filter_from(
621 email.as_ref().map(|e| e.clone().into_owned()),
622 &mut mail_metadata,
623 &mut conn_meta,
624 )
625 .await,
626 Accept(reply, res) => {
627 mail_metadata.from = res;
628 mail_meta = Some(mail_metadata);
629 send_reply!(io, reply).await?;
630 }
631 }
632 }
633 }
634 }
635 }
636
637 Some(Command::Rcpt {
638 path: _path,
639 email,
640 params: _params,
641 }) => match mail_meta {
642 None => {
643 send_reply!(io, cfg.rcpt_before_mail(&mut conn_meta)).await?;
644 }
645 Some(ref mut mail_meta_unw) => dispatch_decision! {
646 cfg.filter_to(email.into_owned(), mail_meta_unw, &mut conn_meta).await,
647 Accept(reply, res) => {
648 mail_meta_unw.to.push(res);
649 send_reply!(io, reply).await?;
650 }
651 },
652 },
653
654 Some(Command::Data) => match mail_meta.take() {
655 None => {
656 send_reply!(io, cfg.data_before_mail(&mut conn_meta)).await?;
657 }
658 Some(ref mail_meta_unw) if mail_meta_unw.to.is_empty() => {
659 send_reply!(io, cfg.data_before_rcpt(&mut conn_meta)).await?;
660 }
661 Some(mut mail_meta_unw) => {
662 dispatch_decision! {
663 cfg.filter_data(&mut mail_meta_unw, &mut conn_meta).await,
664 Reject(reply) => {
665 mail_meta = Some(mail_meta_unw);
666 send_reply!(io, reply).await?;
667 }
668 Accept(reply, ()) => {
669 send_reply!(io, reply).await?;
670 let mut reader =
671 EscapedDataReader::new(rdbuf, unhandled.clone(), &mut io);
672 let expected_n_decisions = match <Cfg::Protocol as Protocol<'static>>::PROTOCOL {
673 ProtocolName::Smtp => 1,
674 ProtocolName::Lmtp => mail_meta_unw.to.len(),
675 };
676 let mut decision_stream = <Cfg::Protocol as Protocol<'_>>::handle_mail_return_type_as_stream(cfg
677 .handle_mail(&mut reader, mail_meta_unw, &mut conn_meta).await);
678 let reader_was_completed = if let Some(u) = reader.get_unhandled() {
681 unhandled = u;
682 true
683 } else {
684 false
685 };
686 if reader_was_completed {
687 let mut n_decisions = 0;
695 while let Some(decision) = decision_stream.next().await {
696 n_decisions += 1;
697 if n_decisions > expected_n_decisions {
698 panic!("got more decisions in handle_mail return than the expected {}", expected_n_decisions);
699 }
700 simple_handler!(decision);
701 }
702 assert_eq!(n_decisions, expected_n_decisions, "got {} decisions in handle_mail return, expected {}", n_decisions, expected_n_decisions);
703 } else {
704 let ignore_buf = &mut [0u8; 128];
708 while read_for_command!(reader.read(ignore_buf)).await? != 0 {}
713 if !reader.is_finished() {
714 return Err(io::Error::new(
716 io::ErrorKind::ConnectionAborted,
717 "connection shutdown during email reception",
718 ));
719 }
720 reader.complete();
721 unhandled = reader.get_unhandled().unwrap();
722 drop(decision_stream);
724 for _i in 0..expected_n_decisions {
725 send_reply!(io, cfg.handle_mail_did_not_call_complete(&mut conn_meta)).await?;
726 }
727 };
728 }
729 }
730 }
731 },
732
733 Some(Command::Rset) => dispatch_decision! {
734 cfg.handle_rset(&mut mail_meta, &mut conn_meta).await,
735 Accept(reply, ()) => {
736 mail_meta = None;
737 send_reply!(io, reply).await?;
738 }
739 },
740
741 Some(Command::Starttls) => {
742 if !cfg.can_do_tls(&conn_meta) {
743 send_reply!(io, cfg.starttls_unsupported(&mut conn_meta)).await?;
744 } else if !unhandled.is_empty() {
745 send_reply!(io, cfg.pipeline_forbidden_after_starttls(&mut conn_meta)).await?;
746 } else {
747 dispatch_decision! {
748 cfg.handle_starttls(&mut conn_meta).await,
749 Accept(reply, ()) => {
750 send_reply!(io, reply).await?;
751 io = cfg.tls_accept(io, &mut conn_meta).await?;
752 mail_meta = None;
753 conn_meta.is_encrypted = true;
754 conn_meta.hello = None;
755 }
756 }
757 }
758 }
759
760 Some(Command::Expn { name }) => {
761 simple_handler!(cfg.handle_expn(name, &mut conn_meta).await)
762 }
763 Some(Command::Vrfy { name }) => {
764 simple_handler!(cfg.handle_vrfy(name, &mut conn_meta).await)
765 }
766 Some(Command::Help { subject }) => {
767 simple_handler!(cfg.handle_help(subject, &mut conn_meta).await)
768 }
769 Some(Command::Noop { string }) => {
770 simple_handler!(cfg.handle_noop(string, &mut conn_meta).await)
771 }
772 Some(Command::Quit) => simple_handler!(cfg.handle_quit(&mut conn_meta).await),
773 }
774 }
775}
776
777#[cfg(test)]
778mod tests {
779 use super::*;
780
781 use std::{
782 self, str,
783 sync::{Arc, Mutex},
784 };
785
786 use async_trait::async_trait;
787 use duplexify::Duplex;
788 use futures::executor;
789
790 use smtp_message::ReplyCode;
791
792 pub fn show_bytes(b: &[u8]) -> String {
794 if b.len() > 512 {
795 format!("{{too long, size = {}}}", b.len())
796 } else if let Ok(s) = str::from_utf8(b) {
797 s.into()
798 } else {
799 format!("{:?}", b)
800 }
801 }
802
803 struct TestConfig {
804 mails: Arc<Mutex<Vec<(Option<Email>, Vec<Email>, Vec<u8>)>>>,
805 }
806
807 #[async_trait]
808 impl Config for TestConfig {
809 type ConnectionUserMeta = ();
810 type MailUserMeta = ();
811 type Protocol = protocol::Smtp;
812
813 fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str {
814 "test.example.org".into()
815 }
816
817 async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) {}
818
819 async fn tls_accept<IO>(
820 &self,
821 mut io: IO,
822 _conn_meta: &mut ConnectionMetadata<Self::ConnectionUserMeta>,
823 ) -> io::Result<
824 duplexify::Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>,
825 >
826 where
827 IO: 'static + Unpin + Send + AsyncRead + AsyncWrite,
828 {
829 io.write_all(b"<tls server>").await?;
830 let mut buf = [0; 12];
831 io.read_exact(&mut buf).await?;
832 assert_eq!(
833 &buf,
834 b"<tls client>",
835 "got TLS handshake that is not <tls client>: {:?}",
836 show_bytes(&buf)
837 );
838 let (r, w) = io.split();
839 Ok(duplexify::Duplex::new(Box::pin(r), Box::pin(w)))
840 }
841
842 async fn filter_from(
843 &self,
844 addr: Option<Email>,
845 _meta: &mut MailMetadata<()>,
846 _conn_meta: &mut ConnectionMetadata<()>,
847 ) -> Decision<Option<Email>> {
848 if addr == Some(Email::parse_bracketed(b"<bad@quux.example.org>").unwrap()) {
851 Decision::Reject {
852 reply: Reply {
853 code: ReplyCode::POLICY_REASON,
854 ecode: None,
855 text: vec!["User 'bad' banned".into()],
856 },
857 }
858 } else {
859 Decision::Accept {
860 reply: reply::okay_from().convert(),
861 res: addr,
862 }
863 }
864 }
865
866 async fn filter_to(
867 &self,
868 email: Email,
869 _meta: &mut MailMetadata<()>,
870 _conn_meta: &mut ConnectionMetadata<()>,
871 ) -> Decision<Email> {
872 if email.localpart.raw() == "baz" {
873 Decision::Reject {
874 reply: Reply {
875 code: ReplyCode::MAILBOX_UNAVAILABLE,
876 ecode: None,
877 text: vec!["No user 'baz'".into()],
878 },
879 }
880 } else {
881 Decision::Accept {
882 reply: reply::okay_to().convert(),
883 res: email,
884 }
885 }
886 }
887
888 async fn handle_mail<'resp, R>(
889 &'resp self,
890 reader: &mut EscapedDataReader<'_, R>,
891 meta: MailMetadata<()>,
892 _conn_meta: &'resp mut ConnectionMetadata<()>,
893 ) -> Decision<()>
894 where
895 R: Send + Unpin + AsyncRead,
896 {
897 let mut mail_text = Vec::new();
898 let res = reader.read_to_end(&mut mail_text).await;
899 if !reader.is_finished() {
900 return Decision::Accept {
904 reply: reply::okay_mail().convert(),
905 res: (),
906 };
907 }
908 reader.complete();
909 if res.is_err() {
910 Decision::Reject {
911 reply: Reply {
912 code: ReplyCode::BAD_SEQUENCE,
913 ecode: None,
914 text: vec!["Closed the channel before end of message".into()],
915 },
916 }
917 } else if mail_text.windows(5).position(|x| x == b"World").is_some() {
918 Decision::Reject {
919 reply: Reply {
920 code: ReplyCode::POLICY_REASON,
921 ecode: None,
922 text: vec!["Don't you dare say 'World'!".into()],
923 },
924 }
925 } else {
926 self.mails
927 .lock()
928 .expect("failed to load mutex")
929 .push((meta.from, meta.to, mail_text));
930 Decision::Accept {
931 reply: reply::okay_mail().convert(),
932 res: (),
933 }
934 }
935 }
936 }
937
938 #[test]
939 fn interacts_ok() {
940 let tests: &[(&[&[u8]], &[u8], &[(Option<&[u8]>, &[&[u8]], &[u8])])] = &[
941 (
942 &[b"EHLO test\r\n\
943 MAIL FROM:<>\r\n\
944 RCPT TO:<baz@quux.example.org>\r\n\
945 RCPT TO:<foo2@bar.example.org>\r\n\
946 RCPT TO:<foo3@bar.example.org>\r\n\
947 DATA\r\n\
948 Hello world\r\n\
949 .\r\n\
950 QUIT\r\n"],
951 b"220 test.example.org Service ready\r\n\
952 250-test.example.org\r\n\
953 250-8BITMIME\r\n\
954 250-ENHANCEDSTATUSCODES\r\n\
955 250-PIPELINING\r\n\
956 250-SMTPUTF8\r\n\
957 250 STARTTLS\r\n\
958 250 2.0.0 Okay\r\n\
959 550 No user 'baz'\r\n\
960 250 2.1.5 Okay\r\n\
961 250 2.1.5 Okay\r\n\
962 354 Start mail input; end with <CRLF>.<CRLF>\r\n\
963 250 2.0.0 Okay\r\n\
964 221 2.0.0 Bye\r\n",
965 &[(
966 None,
967 &[b"<foo2@bar.example.org>", b"<foo3@bar.example.org>"],
968 b"Hello world\r\n.\r\n",
969 )],
970 ),
971 (
972 &[b"HELO test\r\n\
973 MAIL FROM:<test@example.org>\r\n\
974 RCPT TO:<foo@example.org>\r\n\
975 DATA\r\n\
976 Hello World\r\n\
977 .\r\n\
978 QUIT\r\n"],
979 b"220 test.example.org Service ready\r\n\
980 250 test.example.org\r\n\
981 250 2.0.0 Okay\r\n\
982 250 2.1.5 Okay\r\n\
983 354 Start mail input; end with <CRLF>.<CRLF>\r\n\
984 550 Don't you dare say 'World'!\r\n\
985 221 2.0.0 Bye\r\n",
986 &[],
987 ),
988 (
989 &[b"HELO test\r\n\
990 MAIL FROM:<bad@quux.example.org>\r\n\
991 MAIL FROM:<foo@bar.example.org>\r\n\
992 MAIL FROM:<baz@quux.example.org>\r\n\
993 RCPT TO:<foo2@bar.example.org>\r\n\
994 DATA\r\n\
995 Hello\r\n\
996 .\r\n\
997 QUIT\r\n"],
998 b"220 test.example.org Service ready\r\n\
999 250 test.example.org\r\n\
1000 550 User 'bad' banned\r\n\
1001 250 2.0.0 Okay\r\n\
1002 503 5.5.1 Bad sequence of commands\r\n\
1003 250 2.1.5 Okay\r\n\
1004 354 Start mail input; end with <CRLF>.<CRLF>\r\n\
1005 250 2.0.0 Okay\r\n\
1006 221 2.0.0 Bye\r\n",
1007 &[(
1008 Some(b"<foo@bar.example.org>"),
1009 &[b"<foo2@bar.example.org>"],
1010 b"Hello\r\n.\r\n",
1011 )],
1012 ),
1013 (
1014 &[b"HELO test\r\n\
1015 MAIL FROM:<foo@bar.example.org>\r\n\
1016 RSET\r\n\
1017 MAIL FROM:<baz@quux.example.org>\r\n\
1018 RCPT TO:<foo2@bar.example.org>\r\n\
1019 DATA\r\n\
1020 Hello\r\n\
1021 .\r\n\
1022 QUIT\r\n"],
1023 b"220 test.example.org Service ready\r\n\
1024 250 test.example.org\r\n\
1025 250 2.0.0 Okay\r\n\
1026 250 2.0.0 Okay\r\n\
1027 250 2.0.0 Okay\r\n\
1028 250 2.1.5 Okay\r\n\
1029 354 Start mail input; end with <CRLF>.<CRLF>\r\n\
1030 250 2.0.0 Okay\r\n\
1031 221 2.0.0 Bye\r\n",
1032 &[(
1033 Some(b"<baz@quux.example.org>"),
1034 &[b"<foo2@bar.example.org>"],
1035 b"Hello\r\n.\r\n",
1036 )],
1037 ),
1038 (
1039 &[b"HELO test\r\n\
1040 MAIL FROM:<foo@test.example.com>\r\n\
1041 DATA\r\n\
1042 QUIT\r\n"],
1043 b"220 test.example.org Service ready\r\n\
1044 250 test.example.org\r\n\
1045 250 2.0.0 Okay\r\n\
1046 503 5.5.1 Bad sequence of commands\r\n\
1047 221 2.0.0 Bye\r\n",
1048 &[],
1049 ),
1050 (
1051 &[b"HELO test\r\n\
1052 MAIL FROM:<foo@test.example.com>\r\n\
1053 RCPT TO:<foo@bar.example.org>\r\n"],
1054 b"220 test.example.org Service ready\r\n\
1055 250 test.example.org\r\n\
1056 250 2.0.0 Okay\r\n\
1057 250 2.1.5 Okay\r\n",
1058 &[],
1059 ),
1060 (
1061 &[b"HELO test\r\n\
1062 MAIL FROM:<foo@test.example.com>\r\n\
1063 THISISNOTACOMMAND\r\n\
1064 RCPT TO:<foo@bar.example.org>\r\n"],
1065 b"220 test.example.org Service ready\r\n\
1066 250 test.example.org\r\n\
1067 250 2.0.0 Okay\r\n\
1068 500 5.5.1 Command not recognized\r\n\
1069 250 2.1.5 Okay\r\n",
1070 &[],
1071 ),
1072 (
1073 &[b"MAIL FROM:<foo@test.example.com>\r\n"],
1074 b"220 test.example.org Service ready\r\n\
1075 503 5.5.1 Bad sequence of commands\r\n",
1076 &[],
1077 ),
1078 (
1079 &[b"HELO test\r\n\
1080 EXPN foo\r\n\
1081 VRFY bar\r\n\
1082 HELP baz\r\n\
1083 NOOP\r\n"],
1084 b"220 test.example.org Service ready\r\n\
1085 250 test.example.org\r\n\
1086 502 5.5.1 Command not implemented\r\n\
1087 252 2.1.5 Cannot VRFY user, but will accept message and attempt delivery\r\n\
1088 214 2.0.0 See https://tools.ietf.org/html/rfc5321\r\n\
1089 250 2.0.0 Okay\r\n",
1090 &[],
1091 ),
1092 (
1093 &[b"HELO test\r\n\
1094 EXPN foo\r\n\
1095 QUIT\r\n\
1096 HELP baz\r\n"],
1097 b"220 test.example.org Service ready\r\n\
1098 250 test.example.org\r\n\
1099 502 5.5.1 Command not implemented\r\n\
1100 221 2.0.0 Bye\r\n",
1101 &[],
1102 ),
1103 (
1104 &[
1105 b"EHLO test\r\n\
1106 STARTTLS\r\n",
1107 b"<tls client>",
1108 b"EHLO test2\r\n",
1109 ],
1110 b"220 test.example.org Service ready\r\n\
1111 250-test.example.org\r\n\
1112 250-8BITMIME\r\n\
1113 250-ENHANCEDSTATUSCODES\r\n\
1114 250-PIPELINING\r\n\
1115 250-SMTPUTF8\r\n\
1116 250 STARTTLS\r\n\
1117 220 2.0.0 Ready to start TLS\r\n\
1118 <tls server>\
1119 250-test.example.org\r\n\
1120 250-8BITMIME\r\n\
1121 250-ENHANCEDSTATUSCODES\r\n\
1122 250-PIPELINING\r\n\
1123 250 SMTPUTF8\r\n",
1124 &[],
1125 ),
1126 ];
1127 for &(inp, out, mail) in tests {
1128 println!(
1129 "\nSending: {:?}",
1130 inp.iter().map(|b| show_bytes(*b)).collect::<Vec<_>>()
1131 );
1132 let resp_mail = Arc::new(Mutex::new(Vec::new()));
1133 let cfg = Arc::new(TestConfig {
1134 mails: resp_mail.clone(),
1135 });
1136 let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1137 let (mut out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1138 let io = Duplex::new(inp_pipe_r, out_pipe_w);
1139 let ((), resp) = smol::block_on(futures::future::join(
1140 async move {
1141 for i in inp {
1142 for _ in 0..100usize {
1145 smol::future::yield_now().await;
1146 }
1147 inp_pipe_w
1148 .write_all(i)
1149 .await
1150 .expect("writing to input pipe");
1151 }
1152 },
1153 async move {
1154 interact(io, IsAlreadyTls::No, (), cfg)
1155 .await
1156 .expect("calling interact");
1157 let mut resp = Vec::new();
1158 out_pipe_r
1159 .read_to_end(&mut resp)
1160 .await
1161 .expect("reading from output pipe");
1162 resp
1163 },
1164 ));
1165
1166 println!("Expecting: {:?}", show_bytes(out));
1167 println!("Got : {:?}", show_bytes(&resp));
1168 assert_eq!(resp, out);
1169
1170 println!("Checking mails:");
1171 let resp_mail = Arc::try_unwrap(resp_mail).unwrap().into_inner().unwrap();
1172 assert_eq!(resp_mail.len(), mail.len());
1173 for ((fr, tr, cr), &(fo, to, co)) in resp_mail.into_iter().zip(mail) {
1174 println!("Mail\n---");
1175
1176 println!("From: expected {:?}, got {:?}", fo, fr);
1177 assert_eq!(fo.map(|e| Email::parse_bracketed(e).unwrap()), fr);
1178
1179 let to = to
1180 .iter()
1181 .map(|e| Email::parse_bracketed(e).unwrap())
1182 .collect::<Vec<_>>();
1183 println!("To: expected {:?}, got {:?}", to, tr);
1184 assert_eq!(to, tr);
1185
1186 println!("Expected text: {:?}", show_bytes(co));
1187 println!("Got text : {:?}", show_bytes(&cr));
1188 assert_eq!(co, &cr[..]);
1189 }
1190 }
1191 }
1192
1193 #[test]
1195 fn interrupted_data() {
1196 let inp: &[u8] = b"MAIL FROM:foo\r\n\
1197 RCPT TO:bar\r\n\
1198 DATA\r\n\
1199 hello";
1200 let cfg = Arc::new(TestConfig {
1201 mails: Arc::new(Mutex::new(Vec::new())),
1202 });
1203 let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1204 let (_out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1205 let io = Duplex::new(inp_pipe_r, out_pipe_w);
1206 let err_kind = executor::block_on(async move {
1207 inp_pipe_w
1208 .write_all(inp)
1209 .await
1210 .expect("writing to input pipe");
1211 std::mem::drop(inp_pipe_w);
1212 interact(io, IsAlreadyTls::No, (), cfg)
1213 .await
1214 .expect_err("calling interact")
1215 .kind()
1216 });
1217 assert_eq!(err_kind, io::ErrorKind::ConnectionAborted,);
1218 }
1219
1220 #[test]
1222 fn no_stack_overflow() {
1223 let inp: &[u8] =
1224 b"\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1225 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1226 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1227 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1228 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1229 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1230 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1231 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1232 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1233 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1234 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1235 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1236 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1237 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1238 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1239 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1240 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1241 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1242 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1243 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1244 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1245 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1246 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1247 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1248 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1249 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1250 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1251 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1252 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1253 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1254 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1255 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1256 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1257 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1258 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1259 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1260 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1261 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1262 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1263 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1264 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1265 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1266 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1267 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1268 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1269 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1270 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1271 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1272 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1273 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\
1274 \r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\n\r\n\n\r\n\r\n\r\n\r\n\r\n\n\r\n\r\n";
1275 let cfg = Arc::new(TestConfig {
1276 mails: Arc::new(Mutex::new(Vec::new())),
1277 });
1278 let (inp_pipe_r, mut inp_pipe_w) = piper::pipe(1024 * 1024);
1279 let (_out_pipe_r, out_pipe_w) = piper::pipe(1024 * 1024);
1280 let io = Duplex::new(inp_pipe_r, out_pipe_w);
1281 executor::block_on(async move {
1282 inp_pipe_w
1283 .write_all(inp)
1284 .await
1285 .expect("writing to input pipe");
1286 std::mem::drop(inp_pipe_w);
1287 interact(io, IsAlreadyTls::No, (), cfg)
1288 .await
1289 .expect("calling interact");
1290 });
1291 }
1292
1293 struct MinBoundsIo;
1294 impl !Sync for MinBoundsIo {}
1295 impl AsyncRead for MinBoundsIo {
1296 fn poll_read(
1297 self: std::pin::Pin<&mut Self>,
1298 _: &mut std::task::Context<'_>,
1299 _: &mut [u8],
1300 ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
1301 unimplemented!()
1302 }
1303 }
1304 impl AsyncWrite for MinBoundsIo {
1305 fn poll_write(
1306 self: std::pin::Pin<&mut Self>,
1307 _: &mut std::task::Context<'_>,
1308 _: &[u8],
1309 ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
1310 unimplemented!()
1311 }
1312
1313 fn poll_flush(
1314 self: std::pin::Pin<&mut Self>,
1315 _: &mut std::task::Context<'_>,
1316 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
1317 unimplemented!()
1318 }
1319
1320 fn poll_close(
1321 self: std::pin::Pin<&mut Self>,
1322 _: &mut std::task::Context<'_>,
1323 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
1324 unimplemented!()
1325 }
1326 }
1327
1328 fn assert_send<T: Send>(_: T) {}
1329
1330 #[test]
1331 fn interact_is_send() {
1332 let cfg = Arc::new(TestConfig {
1333 mails: Arc::new(Mutex::new(Vec::new())),
1334 });
1335 assert_send(interact(MinBoundsIo, IsAlreadyTls::No, (), cfg));
1336 }
1337}