1pub use indymilter::{message::Version, Actions, IntoCString, MacroStage, ProtoOpts, SocketInfo};
67
68use bytes::{Bytes, BytesMut};
69use indymilter::{
70 message::{
71 self,
72 command::{
73 Command, ConnInfoPayload, EnvAddrPayload, HeaderPayload, HeloPayload, MacroPayload,
74 OptNegPayload, UnknownPayload,
75 },
76 reply::Reply,
77 PROTOCOL_VERSION,
78 },
79 EitherStream,
80};
81use std::{
82 cmp,
83 collections::HashMap,
84 error::Error,
85 ffi::{CStr, CString},
86 fmt::{self, Display, Formatter},
87 io,
88 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
89 time::Duration,
90};
91use tokio::{
92 io::{AsyncWriteExt, BufStream},
93 net::{TcpStream, ToSocketAddrs},
94 task, time,
95};
96
97pub type TestResult<T> = Result<T, TestError>;
99
100#[derive(Debug)]
105pub enum TestError {
106 MilterUsage,
110 InvalidReply,
114 Io(io::Error),
116}
117
118impl Display for TestError {
119 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
120 match self {
121 Self::MilterUsage => write!(f, "misuse of the MTA/milter interface"),
122 Self::InvalidReply => write!(f, "milter sent invalid or unexpected reply"),
123 Self::Io(e) => write!(f, "I/O error: {e}"),
124 }
125 }
126}
127
128impl Error for TestError {
129 fn source(&self) -> Option<&(dyn Error + 'static)> {
130 match self {
131 Self::MilterUsage | Self::InvalidReply => None,
132 Self::Io(e) => Some(e),
133 }
134 }
135}
136
137impl From<io::Error> for TestError {
138 fn from(error: io::Error) -> Self {
139 Self::Io(error)
140 }
141}
142
143#[cfg(unix)]
144type Stream = EitherStream<BufStream<TcpStream>, BufStream<tokio::net::UnixStream>>;
145
146#[cfg(not(unix))]
147type Stream = EitherStream<BufStream<TcpStream>, TcpStream>;
148
149#[derive(Clone, Debug)]
151pub struct TestConnectionBuilder {
152 read_timeout: Duration,
153 write_timeout: Duration,
154
155 version: Version,
156 actions: Actions,
157 opts: ProtoOpts,
158
159 exact_leading_space: bool,
160}
161
162impl TestConnectionBuilder {
163 pub fn read_timeout(mut self, value: Duration) -> Self {
165 self.read_timeout = value;
166 self
167 }
168
169 pub fn write_timeout(mut self, value: Duration) -> Self {
171 self.write_timeout = value;
172 self
173 }
174
175 pub fn protocol_version(mut self, value: Version) -> Self {
177 self.version = value;
178 self
179 }
180
181 pub fn available_actions(mut self, value: Actions) -> Self {
183 self.actions = value;
184 self
185 }
186
187 pub fn available_opts(mut self, value: ProtoOpts) -> Self {
189 self.opts = value;
190 self
191 }
192
193 pub fn exact_leading_space(mut self, value: bool) -> Self {
196 self.exact_leading_space = value;
197 self
198 }
199
200 pub async fn open_tcp(self, addr: impl ToSocketAddrs) -> TestResult<TestConnection> {
202 let stream = time::timeout(self.write_timeout, TcpStream::connect(addr))
205 .await
206 .map_err(io::Error::from)??;
207
208 let stream = BufStream::new(stream);
209
210 self.open(Stream::Tcp(stream)).await
211 }
212
213 #[cfg(unix)]
214 pub async fn open_unix(self, addr: impl AsRef<std::path::Path>) -> TestResult<TestConnection> {
218 let stream = time::timeout(self.write_timeout, tokio::net::UnixStream::connect(addr))
219 .await
220 .map_err(io::Error::from)??;
221
222 let stream = BufStream::new(stream);
223
224 self.open(Stream::Unix(stream)).await
225 }
226
227 async fn open(self, stream: Stream) -> TestResult<TestConnection> {
228 let mut conn = TestConnection {
229 stream,
230
231 read_timeout: self.read_timeout,
232 write_timeout: self.write_timeout,
233
234 actions: Default::default(),
236 opts: Default::default(),
237 macros: Default::default(),
238
239 exact_leading_space: self.exact_leading_space,
240 skip_seen: false,
241 };
242
243 conn.write_command(Command::OptNeg(OptNegPayload {
244 version: self.version,
245 actions: self.actions,
246 opts: self.opts,
247 }))
248 .await?;
249
250 let reply = conn.read_reply().await?;
251
252 if let Reply::OptNeg { actions, opts, macros, .. } = reply {
253 conn.actions = actions;
254 conn.opts = opts;
255 conn.macros = macros;
256 } else {
257 return Err(TestError::InvalidReply);
258 }
259
260 Ok(conn)
261 }
262}
263
264#[derive(Debug)]
266pub struct TestConnection {
267 stream: Stream,
268
269 read_timeout: Duration,
270 write_timeout: Duration,
271
272 actions: Actions,
273 opts: ProtoOpts,
274 macros: HashMap<MacroStage, CString>,
275
276 exact_leading_space: bool,
277
278 skip_seen: bool,
281}
282
283impl TestConnection {
284 pub fn configure() -> TestConnectionBuilder {
286 TestConnectionBuilder {
287 read_timeout: Duration::from_secs(30),
288 write_timeout: Duration::from_secs(30),
289 version: PROTOCOL_VERSION,
290 actions: Actions::all(),
291 opts: ProtoOpts::all(),
292 exact_leading_space: false,
293 }
294 }
295
296 pub async fn open(addr: impl ToSocketAddrs) -> TestResult<Self> {
301 Self::configure().open_tcp(addr).await
302 }
303
304 pub fn negotiated_actions(&self) -> Actions {
306 self.actions
307 }
308
309 pub fn negotiated_opts(&self) -> ProtoOpts {
311 self.opts
312 }
313
314 pub fn requested_macros(&self) -> &HashMap<MacroStage, CString> {
316 &self.macros
317 }
318
319 pub async fn connect(
321 &mut self,
322 hostname: impl IntoCString,
323 socket_info: impl IntoSocketInfo,
324 ) -> TestResult<Status> {
325 self.skip_seen = false;
326
327 if self.opts.contains(ProtoOpts::NO_CONNECT) {
328 return Err(TestError::MilterUsage);
329 }
330
331 let hostname = hostname.into_c_string();
332 let socket_info = socket_info.into_socket_info();
333
334 self.write_command(Command::ConnInfo(ConnInfoPayload {
335 hostname,
336 socket_info,
337 }))
338 .await?;
339
340 let status = if self.opts.contains(ProtoOpts::NOREPLY_CONNECT) {
341 Status::Noreply
342 } else {
343 self.read_status().await?
344 };
345
346 Ok(status)
347 }
348
349 pub async fn helo(&mut self, hostname: impl IntoCString) -> TestResult<Status> {
351 self.skip_seen = false;
352
353 if self.opts.contains(ProtoOpts::NO_HELO) {
354 return Err(TestError::MilterUsage);
355 }
356
357 let hostname = hostname.into_c_string();
358
359 self.write_command(Command::Helo(HeloPayload { hostname }))
360 .await?;
361
362 let status = if self.opts.contains(ProtoOpts::NOREPLY_HELO) {
363 Status::Noreply
364 } else {
365 self.read_status().await?
366 };
367
368 Ok(status)
369 }
370
371 pub async fn mail<I, T>(&mut self, args: I) -> TestResult<Status>
373 where
374 I: IntoIterator<Item = T>,
375 T: IntoCString,
376 {
377 self.skip_seen = false;
378
379 if self.opts.contains(ProtoOpts::NO_MAIL) {
380 return Err(TestError::MilterUsage);
381 }
382
383 let args: Vec<_> = args.into_iter().map(|x| x.into_c_string()).collect();
384
385 if args.is_empty() {
386 return Err(TestError::MilterUsage);
387 }
388
389 self.write_command(Command::Mail(EnvAddrPayload { args }))
390 .await?;
391
392 let status = if self.opts.contains(ProtoOpts::NOREPLY_MAIL) {
393 Status::Noreply
394 } else {
395 self.read_status().await?
396 };
397
398 Ok(status)
399 }
400
401 pub async fn rcpt<I, T>(&mut self, args: I) -> TestResult<Status>
403 where
404 I: IntoIterator<Item = T>,
405 T: IntoCString,
406 {
407 self.skip_seen = false;
408
409 if self.opts.contains(ProtoOpts::NO_RCPT) {
410 return Err(TestError::MilterUsage);
411 }
412
413 let args: Vec<_> = args.into_iter().map(|x| x.into_c_string()).collect();
414
415 if args.is_empty() {
416 return Err(TestError::MilterUsage);
417 }
418
419 self.write_command(Command::Rcpt(EnvAddrPayload { args }))
420 .await?;
421
422 let status = if self.opts.contains(ProtoOpts::NOREPLY_RCPT) {
423 Status::Noreply
424 } else {
425 self.read_status().await?
426 };
427
428 Ok(status)
429 }
430
431 pub async fn data(&mut self) -> TestResult<Status> {
433 self.skip_seen = false;
434
435 if self.opts.contains(ProtoOpts::NO_DATA) {
436 return Err(TestError::MilterUsage);
437 }
438
439 self.write_command(Command::Data).await?;
440
441 let status = if self.opts.contains(ProtoOpts::NOREPLY_DATA) {
442 Status::Noreply
443 } else {
444 self.read_status().await?
445 };
446
447 Ok(status)
448 }
449
450 pub async fn header(
452 &mut self,
453 name: impl IntoCString,
454 value: impl IntoCString,
455 ) -> TestResult<Status> {
456 self.skip_seen = false;
457
458 if self.opts.contains(ProtoOpts::NO_HEADER) {
459 return Err(TestError::MilterUsage);
460 }
461
462 let name = name.into_c_string();
463 if name.as_bytes().is_empty() {
464 return Err(TestError::MilterUsage);
465 }
466
467 let mut value = value.into_c_string();
468
469 if !self.exact_leading_space && self.opts.contains(ProtoOpts::LEADING_SPACE) {
474 let mut bytes = value.into_bytes_with_nul();
475 bytes.insert(0, b' ');
476 value = CString::from_vec_with_nul(bytes).unwrap();
477 }
478
479 self.write_command(Command::Header(HeaderPayload { name, value }))
480 .await?;
481
482 let status = if self.opts.contains(ProtoOpts::NOREPLY_HEADER) {
483 Status::Noreply
484 } else {
485 self.read_status().await?
486 };
487
488 Ok(status)
489 }
490
491 pub async fn eoh(&mut self) -> TestResult<Status> {
493 self.skip_seen = false;
494
495 if self.opts.contains(ProtoOpts::NO_EOH) {
496 return Err(TestError::MilterUsage);
497 }
498
499 self.write_command(Command::Eoh).await?;
500
501 let status = if self.opts.contains(ProtoOpts::NOREPLY_EOH) {
502 Status::Noreply
503 } else {
504 self.read_status().await?
505 };
506
507 Ok(status)
508 }
509
510 pub async fn body(&mut self, bytes: impl Into<Bytes>) -> TestResult<Status> {
512 if self.skip_seen {
513 self.skip_seen = false;
514 return Err(TestError::MilterUsage);
515 }
516
517 if self.opts.contains(ProtoOpts::NO_BODY) {
518 return Err(TestError::MilterUsage);
519 }
520
521 let mut bytes = bytes.into();
522
523 if bytes.is_empty() {
524 self.write_command(Command::BodyChunk(Default::default()))
525 .await?;
526
527 let status = if self.opts.contains(ProtoOpts::NOREPLY_BODY) {
528 Status::Noreply
529 } else {
530 let status = self.read_status().await?;
531 if status == Status::Skip {
532 self.skip_seen = true;
533 }
534 status
535 };
536
537 return Ok(status);
538 }
539
540 const CHUNK_SIZE: usize = 65535;
543
544 let mut status = Status::Continue; loop {
547 let next_chunk_len = cmp::min(CHUNK_SIZE, bytes.len());
548 if next_chunk_len == 0 {
549 break;
550 }
551
552 let chunk = bytes.split_to(next_chunk_len);
553
554 self.write_command(Command::BodyChunk(chunk)).await?;
555
556 if self.opts.contains(ProtoOpts::NOREPLY_BODY) {
557 status = Status::Noreply;
558 } else {
559 status = self.read_status().await?;
560 if status == Status::Skip {
561 self.skip_seen = true;
562 }
563 if status != Status::Continue {
564 break;
565 }
566 }
567 }
568
569 Ok(status)
570 }
571
572 pub async fn eom(&mut self) -> TestResult<(EomActions, Status)> {
574 self.skip_seen = false;
575
576 self.write_command(Command::BodyEnd(Default::default()))
577 .await?;
578
579 let mut actions = vec![];
580
581 let final_reply = loop {
582 let reply = self.read_reply().await?;
583
584 match reply {
585 Reply::Accept
586 | Reply::Continue
587 | Reply::Discard
588 | Reply::Reject
589 | Reply::Skip
590 | Reply::Tempfail
591 | Reply::ReplyCode { .. } => {
592 break reply;
593 }
594 reply => {
595 let action = EomAction::from_reply(reply)?;
596 actions.push(action);
597 }
598 }
599 };
600
601 let status = self.make_into_status(final_reply)?;
602
603 let actions = EomActions { actions };
604
605 Ok((actions, status))
606 }
607
608 pub async fn abort(&mut self) -> TestResult<()> {
610 self.skip_seen = false;
611
612 self.write_command(Command::Abort).await?;
613
614 Ok(())
615 }
616
617 pub async fn unknown(&mut self, arg: impl IntoCString) -> TestResult<Status> {
619 self.skip_seen = false;
620
621 if self.opts.contains(ProtoOpts::NO_UNKNOWN) {
622 return Err(TestError::MilterUsage);
623 }
624
625 let arg = arg.into_c_string();
626
627 self.write_command(Command::Unknown(UnknownPayload { arg }))
628 .await?;
629
630 let status = if self.opts.contains(ProtoOpts::NOREPLY_UNKNOWN) {
631 Status::Noreply
632 } else {
633 self.read_status().await?
634 };
635
636 Ok(status)
637 }
638
639 pub async fn close(mut self) -> TestResult<()> {
641 self.write_command(Command::Quit).await?;
642
643 self.stream.shutdown().await?;
644
645 for _ in 0..11 {
656 task::yield_now().await;
657 }
658
659 Ok(())
660 }
661
662 pub async fn macros<I, K, V>(&mut self, stage: MacroStage, macros: I) -> TestResult<()>
664 where
665 I: IntoIterator<Item = (K, V)>,
666 K: IntoCString,
667 V: IntoCString,
668 {
669 self.skip_seen = false;
670
671 let macros: Vec<_> = macros
672 .into_iter()
673 .flat_map(|(k, v)| [k.into_c_string(), v.into_c_string()])
674 .collect();
675
676 if macros.is_empty() {
677 return Err(TestError::MilterUsage);
678 }
679
680 self.write_command(Command::DefMacros(MacroPayload { stage, macros }))
681 .await?;
682
683 Ok(())
684 }
685
686 async fn write_command(&mut self, cmd: Command) -> io::Result<()> {
687 let msg = cmd.into_message();
688
689 let f = message::write(&mut self.stream, msg);
690 time::timeout(self.write_timeout, f).await??;
691
692 Ok(())
693 }
694
695 async fn read_status(&mut self) -> TestResult<Status> {
696 let reply = self.read_reply().await?;
697
698 self.make_into_status(reply)
699 }
700
701 fn make_into_status(&self, reply: Reply) -> TestResult<Status> {
702 let status = Status::from_reply(reply)?;
703
704 if status == Status::Skip && !self.opts.contains(ProtoOpts::SKIP) {
705 return Err(TestError::InvalidReply);
708 }
709
710 Ok(status)
711 }
712
713 async fn read_reply(&mut self) -> TestResult<Reply> {
714 let f = message::read(&mut self.stream);
715 let msg = time::timeout(self.read_timeout, f)
716 .await
717 .map_err(io::Error::from)??;
718
719 let reply = Reply::parse_reply(msg).map_err(|_| TestError::InvalidReply)?;
720
721 Ok(reply)
722 }
723}
724
725pub trait IntoSocketInfo {
727 fn into_socket_info(self) -> SocketInfo;
728}
729
730impl IntoSocketInfo for SocketInfo {
731 fn into_socket_info(self) -> SocketInfo {
732 self
733 }
734}
735
736impl IntoSocketInfo for SocketAddr {
737 fn into_socket_info(self) -> SocketInfo {
738 self.into()
739 }
740}
741
742impl<I: Into<IpAddr>> IntoSocketInfo for (I, u16) {
743 fn into_socket_info(self) -> SocketInfo {
744 let addr = self.0.into();
745 SocketAddr::from((addr, self.1)).into()
746 }
747}
748
749impl IntoSocketInfo for [u8; 4] {
750 fn into_socket_info(self) -> SocketInfo {
751 (self, 0).into_socket_info()
752 }
753}
754
755impl IntoSocketInfo for [u16; 8] {
756 fn into_socket_info(self) -> SocketInfo {
757 (self, 0).into_socket_info()
758 }
759}
760
761impl IntoSocketInfo for [u8; 16] {
762 fn into_socket_info(self) -> SocketInfo {
763 (self, 0).into_socket_info()
764 }
765}
766
767impl IntoSocketInfo for Ipv4Addr {
768 fn into_socket_info(self) -> SocketInfo {
769 (self, 0).into_socket_info()
770 }
771}
772
773impl IntoSocketInfo for Ipv6Addr {
774 fn into_socket_info(self) -> SocketInfo {
775 (self, 0).into_socket_info()
776 }
777}
778
779impl IntoSocketInfo for IpAddr {
780 fn into_socket_info(self) -> SocketInfo {
781 (self, 0).into_socket_info()
782 }
783}
784
785impl IntoSocketInfo for CString {
786 fn into_socket_info(self) -> SocketInfo {
787 self.into()
788 }
789}
790
791impl IntoSocketInfo for &CStr {
792 fn into_socket_info(self) -> SocketInfo {
793 self.into_c_string().into()
794 }
795}
796
797impl IntoSocketInfo for &str {
798 fn into_socket_info(self) -> SocketInfo {
799 self.into_c_string().into()
800 }
801}
802
803#[derive(Clone, Debug, Eq, Hash, PartialEq)]
805pub enum Status {
806 Accept,
807 Continue,
808 Discard,
809 Skip,
810 Reject { message: Option<CString> },
811 Tempfail { message: Option<CString> },
812 Noreply,
813}
814
815impl Status {
816 fn from_reply(reply: Reply) -> Result<Self, TestError> {
817 match reply {
818 Reply::Accept => Ok(Self::Accept),
819 Reply::Continue => Ok(Self::Continue),
820 Reply::Discard => Ok(Self::Discard),
821 Reply::Reject => Ok(Self::Reject { message: None }),
822 Reply::Skip => Ok(Self::Skip),
823 Reply::Tempfail => Ok(Self::Tempfail { message: None }),
824 Reply::ReplyCode { reply } => {
825 if reply.as_bytes().starts_with(&[b'4']) {
826 Ok(Self::Tempfail {
827 message: Some(reply),
828 })
829 } else if reply.as_bytes().starts_with(&[b'5']) {
830 Ok(Self::Reject {
831 message: Some(reply),
832 })
833 } else {
834 Err(TestError::InvalidReply)
835 }
836 }
837 _ => Err(TestError::InvalidReply),
838 }
839 }
840}
841
842#[derive(Clone, Debug, Eq, Hash, PartialEq)]
844pub enum EomAction {
845 AddHeader {
846 name: CString,
847 value: CString,
848 },
849 InsertHeader {
850 index: i32,
851 name: CString,
852 value: CString,
853 },
854 ChangeHeader {
855 name: CString,
856 index: i32,
857 value: CString, },
859 DeleteHeader {
860 name: CString,
861 index: i32,
862 },
863 ChangeSender {
864 mail: CString,
865 args: Option<CString>,
866 },
867 AddRecipient {
868 rcpt: CString,
870 args: Option<CString>,
871 },
872 DeleteRecipient {
873 rcpt: CString,
874 },
875 ReplaceBody {
876 chunk: Bytes,
877 },
878 Progress,
879 Quarantine {
880 reason: CString,
881 },
882}
883
884impl EomAction {
885 fn from_reply(reply: Reply) -> Result<Self, TestError> {
886 match reply {
887 Reply::AddRcpt { rcpt } => Ok(Self::AddRecipient { rcpt, args: None }),
888 Reply::DeleteRcpt { rcpt } => Ok(Self::DeleteRecipient { rcpt }),
889 Reply::AddRcptExt { rcpt, args } => Ok(Self::AddRecipient { rcpt, args }),
890 Reply::ReplaceBody { chunk } => Ok(Self::ReplaceBody { chunk }),
891 Reply::ChangeSender { mail, args } => Ok(Self::ChangeSender { mail, args }),
892 Reply::AddHeader { name, value } => Ok(Self::AddHeader { name, value }),
893 Reply::InsertHeader { index, name, value } => {
894 Ok(Self::InsertHeader { index, name, value })
895 }
896 Reply::ChangeHeader { name, index, value } => {
897 if value.as_bytes().is_empty() {
898 Ok(Self::DeleteHeader { name, index })
899 } else {
900 Ok(Self::ChangeHeader { name, index, value })
901 }
902 }
903 Reply::Progress => Ok(Self::Progress),
904 Reply::Quarantine { reason } => Ok(Self::Quarantine { reason }),
905 _ => Err(TestError::InvalidReply),
906 }
907 }
908}
909
910#[derive(Clone, Debug, Eq, Hash, PartialEq)]
914pub struct EomActions {
915 pub actions: Vec<EomAction>,
916}
917
918impl EomActions {
919 pub fn has_add_header<M1, M2>(&self, mname: M1, mvalue: M2) -> bool
920 where
921 M1: for<'a> Matcher<&'a CStr>,
922 M2: for<'a> Matcher<&'a CStr>,
923 {
924 self.actions.iter().any(|a| {
925 matches!(a, EomAction::AddHeader { name, value }
926 if mname.matches(name) && mvalue.matches(value))
927 })
928 }
929
930 pub fn has_insert_header<M1, M2, M3>(&self, mindex: M1, mname: M2, mvalue: M3) -> bool
931 where
932 M1: Matcher<i32>,
933 M2: for<'a> Matcher<&'a CStr>,
934 M3: for<'a> Matcher<&'a CStr>,
935 {
936 self.actions.iter().any(|a| {
937 matches!(a, EomAction::InsertHeader { index, name, value }
938 if mindex.matches(*index) && mname.matches(name) && mvalue.matches(value))
939 })
940 }
941
942 pub fn has_change_header<M1, M2, M3>(&self, mname: M1, mindex: M2, mvalue: M3) -> bool
943 where
944 M1: for<'a> Matcher<&'a CStr>,
945 M2: Matcher<i32>,
946 M3: for<'a> Matcher<&'a CStr>,
947 {
948 self.actions.iter().any(|a| {
949 matches!(a, EomAction::ChangeHeader { name, index, value }
950 if mname.matches(name) && mindex.matches(*index) && mvalue.matches(value))
951 })
952 }
953
954 pub fn has_delete_header<M1, M2>(&self, mname: M1, mindex: M2) -> bool
955 where
956 M1: for<'a> Matcher<&'a CStr>,
957 M2: Matcher<i32>,
958 {
959 self.actions.iter().any(|a| {
960 matches!(a, EomAction::DeleteHeader { name, index }
961 if mname.matches(name) && mindex.matches(*index))
962 })
963 }
964
965 pub fn has_change_sender<M1, M2>(&self, mmail: M1, margs: M2) -> bool
966 where
967 M1: for<'a> Matcher<&'a CStr>,
968 M2: for<'a> Matcher<Option<&'a CStr>>,
969 {
970 self.actions.iter().any(|a| {
971 matches!(a, EomAction::ChangeSender { mail, args }
972 if mmail.matches(mail) && margs.matches(args.as_deref()))
973 })
974 }
975
976 pub fn has_add_recipient<M1, M2>(&self, mrcpt: M1, margs: M2) -> bool
977 where
978 M1: for<'a> Matcher<&'a CStr>,
979 M2: for<'a> Matcher<Option<&'a CStr>>,
980 {
981 self.actions.iter().any(|a| {
982 matches!(a, EomAction::AddRecipient { rcpt, args }
983 if mrcpt.matches(rcpt) && margs.matches(args.as_deref()))
984 })
985 }
986
987 pub fn has_delete_recipient<M>(&self, mrcpt: M) -> bool
988 where
989 M: for<'a> Matcher<&'a CStr>,
990 {
991 self.actions.iter().any(|a| {
992 matches!(a, EomAction::DeleteRecipient { rcpt } if mrcpt.matches(rcpt))
993 })
994 }
995
996 pub fn has_replaced_body<M>(&self, body: M) -> bool
997 where
998 M: for<'a> Matcher<&'a [u8]>,
999 {
1000 if !self.actions.iter().any(|a| matches!(a, EomAction::ReplaceBody { .. })) {
1001 return false;
1002 }
1003
1004 let mut replaced_body = BytesMut::new();
1005
1006 for a in &self.actions {
1007 if let EomAction::ReplaceBody { chunk } = a {
1008 replaced_body.extend(chunk);
1009 }
1010 }
1011
1012 body.matches(&replaced_body)
1013 }
1014
1015 pub fn has_quarantine<M>(&self, mreason: M) -> bool
1016 where
1017 M: for<'a> Matcher<&'a CStr>,
1018 {
1019 self.actions.iter().any(|a| {
1020 matches!(a, EomAction::Quarantine { reason } if mreason.matches(reason))
1021 })
1022 }
1023}
1024
1025pub trait Matcher<T> {
1027 fn matches(&self, value: T) -> bool;
1028}
1029
1030impl Matcher<i32> for i32 {
1031 fn matches(&self, value: i32) -> bool {
1032 *self == value
1033 }
1034}
1035
1036impl<F> Matcher<i32> for F
1037where
1038 F: Fn(i32) -> bool,
1039{
1040 fn matches(&self, value: i32) -> bool {
1041 (self)(value)
1042 }
1043}
1044
1045impl<'a, 'b> Matcher<&'b CStr> for &'a str {
1046 fn matches(&self, value: &'b CStr) -> bool {
1047 self.as_bytes() == value.to_bytes()
1048 }
1049}
1050
1051impl<'a, 'b> Matcher<Option<&'b CStr>> for &'a str {
1052 fn matches(&self, value: Option<&'b CStr>) -> bool {
1053 match value {
1054 None => false,
1055 Some(value) => self.matches(value),
1056 }
1057 }
1058}
1059
1060impl<'a, 'b> Matcher<Option<&'b CStr>> for Option<&'a str> {
1061 fn matches(&self, value: Option<&'b CStr>) -> bool {
1062 match (self, value) {
1063 (None, None) => true,
1064 (Some(s), Some(value)) => s.matches(value),
1065 _ => false,
1066 }
1067 }
1068}
1069
1070impl<'a, 'b> Matcher<&'b [u8]> for &'a [u8] {
1071 fn matches(&self, value: &'b [u8]) -> bool {
1072 *self == value
1073 }
1074}
1075
1076impl<'a, 'b, const N: usize> Matcher<&'b [u8]> for &'a [u8; N] {
1077 fn matches(&self, value: &'b [u8]) -> bool {
1078 self == &value
1079 }
1080}
1081
1082impl<'a, 'b> Matcher<&'b [u8]> for &'a str {
1083 fn matches(&self, value: &'b [u8]) -> bool {
1084 self.as_bytes() == value
1085 }
1086}
1087
1088#[cfg(feature = "regex")]
1089impl<'a, 'b> Matcher<&'b CStr> for &'a regex::Regex {
1090 fn matches(&self, value: &'b CStr) -> bool {
1091 match value.to_str() {
1092 Ok(s) => self.is_match(s),
1093 Err(_) => false,
1094 }
1095 }
1096}
1097
1098#[cfg(feature = "regex")]
1099impl<'a, 'b> Matcher<Option<&'b CStr>> for &'a regex::Regex {
1100 fn matches(&self, value: Option<&'b CStr>) -> bool {
1101 match value {
1102 None => false,
1103 Some(value) => self.matches(value),
1104 }
1105 }
1106}
1107
1108#[cfg(feature = "regex")]
1109impl<'a, 'b> Matcher<&'b CStr> for &'a regex::bytes::Regex {
1110 fn matches(&self, value: &'b CStr) -> bool {
1111 self.is_match(value.to_bytes())
1112 }
1113}
1114
1115#[cfg(feature = "regex")]
1116impl<'a, 'b> Matcher<Option<&'b CStr>> for &'a regex::bytes::Regex {
1117 fn matches(&self, value: Option<&'b CStr>) -> bool {
1118 match value {
1119 None => false,
1120 Some(value) => self.matches(value),
1121 }
1122 }
1123}
1124
1125#[cfg(feature = "regex")]
1126impl<'a, 'b> Matcher<&'b [u8]> for &'a regex::bytes::Regex {
1127 fn matches(&self, value: &'b [u8]) -> bool {
1128 self.is_match(value)
1129 }
1130}
1131
1132#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)]
1134pub struct AnyMatcher;
1135
1136impl<T> Matcher<T> for AnyMatcher {
1137 fn matches(&self, _: T) -> bool {
1138 true
1139 }
1140}
1141
1142pub fn any() -> AnyMatcher {
1144 AnyMatcher
1145}