#![doc = include_str!("../Readme.md")]
mod codec;
#[cfg(feature = "_fuzzing")]
pub mod fuzzing;
use std::{ops::Deref, sync::Arc};
use asynchronous_codec::Framed;
use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
use miltr_utils::debug;
use paste::paste;
use thiserror::Error;
#[cfg(feature = "tracing")]
use tracing::{instrument, Level};
use miltr_common::{
actions::{Abort, Action, Quit},
commands::{
Body, Command, Connect, Data, EndOfBody, EndOfHeader, Header, Helo, Mail, Recipient,
Unknown,
},
decoding::ServerCommand,
modifications::{ModificationAction, ModificationResponse},
optneg::{CompatibilityError, OptNeg},
ProtocolError,
};
use self::codec::MilterCodec;
pub struct Client {
options: Arc<OptNeg>,
codec: MilterCodec,
}
pub struct Connection<RW: AsyncRead + AsyncWrite + Unpin> {
framed: Framed<RW, MilterCodec>,
options: OptNeg,
}
impl Client {
#[must_use]
pub fn new(options: OptNeg) -> Self {
let codec = MilterCodec::new(2_usize.pow(16));
Self {
options: Arc::new(options),
codec,
}
}
async fn recv_option_negotiation<RW: AsyncRead + AsyncWrite + Unpin>(
&self,
framed: &mut Framed<RW, MilterCodec>,
) -> Result<OptNeg, ResponseError> {
let client_options = &self.options;
framed.send(&client_options.deref().clone().into()).await?;
let resp = framed
.next()
.await
.ok_or(ResponseError::MissingServerResponse)??;
let server_options = match resp {
ServerCommand::OptNeg(optneg) => Ok(optneg),
command => Err(ResponseError::Unexpected(command)),
}?;
let options = server_options.merge_compatible(&self.options)?;
Ok(options)
}
pub async fn connect_via<RW: AsyncRead + AsyncWrite + Unpin>(
&self,
connection: RW,
) -> Result<Connection<RW>, ResponseError> {
let codec = self.codec.clone();
let mut framed = Framed::new(connection, codec);
let options = self.recv_option_negotiation(&mut framed).await?;
let connection = Connection { framed, options };
Ok(connection)
}
}
macro_rules! command {
(
$(#[$outer:meta])*
(into) $variant:ident
) => {
paste! {
$(#[$outer])*
pub async fn [<$variant:snake>]<C: Into<[<$variant:camel>]>>(&mut self, command: C) -> Result<(), ResponseError> {
let command_intoed: [<$variant:camel>] = command.into();
let command: Command = command_intoed.into();
self.send_command(command).await
}
}
};
(
$(#[$outer:meta])*
(new) $variant:ident
) => {
paste! {
$(#[$outer])*
pub async fn [<$variant:snake>](&mut self) -> Result<(), ResponseError> {
let command: Command = [<$variant:camel>].into();
self.send_command(command).await
}
}
};
}
impl<RW: AsyncRead + AsyncWrite + Unpin> Connection<RW> {
command!(
(into) Connect
);
command!(
(into) Helo
);
command!(
(into) Mail
);
command!(
(into) Recipient
);
command!(
(new) Data
);
command!(
(into) Header
);
command!(
(new) EndOfHeader
);
command!(
(into) Body
);
pub async fn end_of_body(&mut self) -> Result<ModificationResponse, ResponseError> {
let command: Command = EndOfBody.into();
self.framed.send(&command.into()).await?;
let mut modification_response_builder = ModificationResponse::builder();
loop {
let answer = self.receive_answer().await?;
let command: CommandType = answer.try_into()?;
match command {
CommandType::Action(action) => {
return Ok(modification_response_builder.build(action));
}
CommandType::ModificationAction(action) => {
modification_response_builder.push(action);
}
}
}
}
pub async fn modification(&mut self) -> Result<CommandType, ResponseError> {
let resp = self.receive_answer().await?;
CommandType::try_from(resp)
}
pub async fn quit(mut self) -> Result<(), ProtocolError> {
self.framed.send(&Action::Quit(Quit).into()).await?;
Ok(())
}
pub fn quit_nc(self) -> Result<(), ProtocolError> {
todo!("Quit_NC Not yet implemented")
}
pub async fn abort(mut self) -> Result<(), ProtocolError> {
self.framed.send(&Action::from(Abort).into()).await?;
Ok(())
}
command!(
(into) Unknown
);
#[cfg_attr(feature = "tracing", instrument(level = Level::DEBUG, skip(self), fields(%command), err))]
async fn send_command(&mut self, command: Command) -> Result<(), ResponseError> {
if self.options.protocol.should_skip_send(&command) {
debug!("Skip sending");
return Ok(());
}
let skip_response = self.options.protocol.should_skip_response(&command);
debug!("Sending command");
self.framed.send(&command.into()).await?;
if skip_response {
debug!("Skip receiving response");
return Ok(());
}
self.expect_continue().await
}
async fn receive_answer(&mut self) -> Result<ServerCommand, ResponseError> {
let resp = self
.framed
.next()
.await
.ok_or(ResponseError::MissingServerResponse)??;
Ok(resp)
}
async fn expect_continue(&mut self) -> Result<(), ResponseError> {
let resp = self.receive_answer().await?;
match resp {
ServerCommand::Continue(_c) => Ok(()),
command => Err(ResponseError::Unexpected(command)),
}
}
}
#[derive(Debug, Error)]
pub enum ResponseError {
#[error(transparent)]
ProtocolError(#[from] ProtocolError),
#[error("Server did not respond to a query")]
MissingServerResponse,
#[error("Server respond with an unexpected answer")]
Unexpected(ServerCommand),
#[error(transparent)]
CompatibilityError(#[from] CompatibilityError),
}
pub enum CommandType {
Action(Action),
ModificationAction(ModificationAction),
}
impl TryFrom<ServerCommand> for CommandType {
type Error = ResponseError;
fn try_from(value: ServerCommand) -> Result<Self, Self::Error> {
match value {
ServerCommand::OptNeg(value) => Err(ResponseError::Unexpected(value.into())),
ServerCommand::Abort(value) => Ok(Self::Action(value.into())),
ServerCommand::Continue(value) => Ok(Self::Action(value.into())),
ServerCommand::Discard(value) => Ok(Self::Action(value.into())),
ServerCommand::Reject(value) => Ok(Self::Action(value.into())),
ServerCommand::Tempfail(value) => Ok(Self::Action(value.into())),
ServerCommand::Skip(value) => Ok(Self::Action(value.into())),
ServerCommand::Replycode(value) => Ok(Self::Action(value.into())),
ServerCommand::AddRecipient(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::DeleteRecipient(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::ReplaceBody(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::AddHeader(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::InsertHeader(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::ChangeHeader(value) => Ok(Self::ModificationAction(value.into())),
ServerCommand::Quarantine(value) => Ok(Self::ModificationAction(value.into())),
}
}
}