1#![doc = include_str!("../Readme.md")]
2
3mod codec;
4
5#[cfg(feature = "_fuzzing")]
6pub mod fuzzing;
7
8use std::{ops::Deref, sync::Arc};
9
10use asynchronous_codec::Framed;
11use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
12use miltr_utils::debug;
13use paste::paste;
14use thiserror::Error;
15#[cfg(feature = "tracing")]
16use tracing::{instrument, Level};
17
18use miltr_common::{
19 actions::{Abort, Action, Quit},
20 commands::{
21 Body, Command, Connect, Data, EndOfBody, EndOfHeader, Header, Helo, Mail, Recipient,
22 Unknown,
23 },
24 decoding::ServerCommand,
25 modifications::{ModificationAction, ModificationResponse},
26 optneg::{CompatibilityError, OptNeg},
27 ProtocolError,
28};
29
30use self::codec::MilterCodec;
31
32pub struct Client {
34 options: Arc<OptNeg>,
35 codec: MilterCodec,
36}
37
38pub struct Connection<RW: AsyncRead + AsyncWrite + Unpin> {
77 framed: Framed<RW, MilterCodec>,
78 options: OptNeg,
79}
80
81impl Client {
82 #[must_use]
85 pub fn new(options: OptNeg) -> Self {
86 let codec = MilterCodec::new(2_usize.pow(16));
87
88 Self {
89 options: Arc::new(options),
90 codec,
91 }
92 }
93
94 async fn recv_option_negotiation<RW: AsyncRead + AsyncWrite + Unpin>(
101 &self,
102 framed: &mut Framed<RW, MilterCodec>,
103 ) -> Result<OptNeg, ResponseError> {
104 let client_options = &self.options;
105 framed.send(&client_options.deref().clone().into()).await?;
106
107 let resp = framed
108 .next()
109 .await
110 .ok_or(ResponseError::MissingServerResponse)??;
111
112 let server_options = match resp {
113 ServerCommand::OptNeg(optneg) => Ok(optneg),
114 command => Err(ResponseError::Unexpected(command)),
115 }?;
116
117 let options = server_options.merge_compatible(&self.options)?;
118
119 Ok(options)
120 }
121
122 pub async fn connect_via<RW: AsyncRead + AsyncWrite + Unpin>(
127 &self,
128 connection: RW,
129 ) -> Result<Connection<RW>, ResponseError> {
130 let codec = self.codec.clone();
131 let mut framed = Framed::new(connection, codec);
132 let options = self.recv_option_negotiation(&mut framed).await?;
133
134 let connection = Connection { framed, options };
135
136 Ok(connection)
137 }
138}
139
140macro_rules! command {
141 (
142 $(#[$outer:meta])*
143 (into) $variant:ident
144 ) => {
145 paste! {
146 $(#[$outer])*
147 pub async fn [<$variant:snake>]<C: Into<[<$variant:camel>]>>(&mut self, command: C) -> Result<(), ResponseError> {
148 let command_intoed: [<$variant:camel>] = command.into();
149 let command: Command = command_intoed.into();
150
151 self.send_command(command).await
152 }
153 }
154 };
155 (
156 $(#[$outer:meta])*
157 (new) $variant:ident
158 ) => {
159 paste! {
160 $(#[$outer])*
161 pub async fn [<$variant:snake>](&mut self) -> Result<(), ResponseError> {
162 let command: Command = [<$variant:camel>].into();
163
164 self.send_command(command).await
165 }
166 }
167 };
168}
169
170impl<RW: AsyncRead + AsyncWrite + Unpin> Connection<RW> {
171 command!(
172 (into) Connect
178 );
179
180 command!(
181 (into) Helo
187 );
188
189 command!(
190 (into) Mail
196 );
197
198 command!(
199 (into) Recipient
205 );
206
207 command!(
208 (new) Data
214 );
215
216 command!(
217 (into) Header
223 );
224
225 command!(
226 (new) EndOfHeader
232 );
233
234 command!(
235 (into) Body
241 );
242
243 pub async fn end_of_body(&mut self) -> Result<ModificationResponse, ResponseError> {
256 let command: Command = EndOfBody.into();
258 self.framed.send(&command.into()).await?;
259
260 let mut modification_response_builder = ModificationResponse::builder();
261 loop {
262 let answer = self.receive_answer().await?;
264
265 let command: CommandType = answer.try_into()?;
267
268 match command {
269 CommandType::Action(action) => {
270 return Ok(modification_response_builder.build(action));
271 }
272 CommandType::ModificationAction(action) => {
273 modification_response_builder.push(action);
274 }
275 }
276 }
277 }
278
279 pub async fn modification(&mut self) -> Result<CommandType, ResponseError> {
284 let resp = self.receive_answer().await?;
285
286 CommandType::try_from(resp)
287 }
288
289 pub async fn quit(mut self) -> Result<(), ProtocolError> {
294 self.framed.send(&Action::Quit(Quit).into()).await?;
295
296 Ok(())
297 }
298
299 pub fn quit_nc(self) -> Result<(), ProtocolError> {
304 todo!("Quit_NC Not yet implemented")
305 }
306
307 pub async fn abort(mut self) -> Result<(), ProtocolError> {
312 self.framed.send(&Action::from(Abort).into()).await?;
313
314 Ok(())
315 }
316
317 command!(
318 (into) Unknown
324 );
325
326 #[cfg_attr(feature = "tracing", instrument(level = Level::DEBUG, skip(self), fields(%command), err))]
328 async fn send_command(&mut self, command: Command) -> Result<(), ResponseError> {
329 if self.options.protocol.should_skip_send(&command) {
331 debug!("Skip sending");
332 return Ok(());
333 }
334 let skip_response = self.options.protocol.should_skip_response(&command);
335
336 debug!("Sending command");
338 self.framed.send(&command.into()).await?;
339
340 if skip_response {
342 debug!("Skip receiving response");
343 return Ok(());
344 }
345 self.expect_continue().await
346 }
347
348 async fn receive_answer(&mut self) -> Result<ServerCommand, ResponseError> {
350 let resp = self
351 .framed
352 .next()
353 .await
354 .ok_or(ResponseError::MissingServerResponse)??;
355
356 Ok(resp)
357 }
358 async fn expect_continue(&mut self) -> Result<(), ResponseError> {
360 let resp = self.receive_answer().await?;
362
363 match resp {
365 ServerCommand::Continue(_c) => Ok(()),
366 command => Err(ResponseError::Unexpected(command)),
367 }
368 }
369}
370
371#[derive(Debug, Error)]
373pub enum ResponseError {
374 #[error(transparent)]
376 ProtocolError(#[from] ProtocolError),
377 #[error("Server did not respond to a query")]
379 MissingServerResponse,
380 #[error("Server respond with an unexpected answer")]
382 Unexpected(ServerCommand),
383 #[error(transparent)]
385 CompatibilityError(#[from] CompatibilityError),
386}
387
388pub enum CommandType {
390 Action(Action),
392 ModificationAction(ModificationAction),
394}
395
396impl TryFrom<ServerCommand> for CommandType {
397 type Error = ResponseError;
398
399 fn try_from(value: ServerCommand) -> Result<Self, Self::Error> {
400 match value {
401 ServerCommand::OptNeg(value) => Err(ResponseError::Unexpected(value.into())),
402 ServerCommand::Abort(value) => Ok(Self::Action(value.into())),
403 ServerCommand::Continue(value) => Ok(Self::Action(value.into())),
404 ServerCommand::Discard(value) => Ok(Self::Action(value.into())),
405 ServerCommand::Reject(value) => Ok(Self::Action(value.into())),
406 ServerCommand::Tempfail(value) => Ok(Self::Action(value.into())),
407 ServerCommand::Skip(value) => Ok(Self::Action(value.into())),
408 ServerCommand::Replycode(value) => Ok(Self::Action(value.into())),
409 ServerCommand::AddRecipient(value) => Ok(Self::ModificationAction(value.into())),
410 ServerCommand::DeleteRecipient(value) => Ok(Self::ModificationAction(value.into())),
411 ServerCommand::ReplaceBody(value) => Ok(Self::ModificationAction(value.into())),
412 ServerCommand::AddHeader(value) => Ok(Self::ModificationAction(value.into())),
413 ServerCommand::InsertHeader(value) => Ok(Self::ModificationAction(value.into())),
414 ServerCommand::ChangeHeader(value) => Ok(Self::ModificationAction(value.into())),
415 ServerCommand::Quarantine(value) => Ok(Self::ModificationAction(value.into())),
416 }
417 }
418}