1#![doc = include_str!("../Readme.md")]
2
3mod codec;
4mod milter;
5
6#[cfg(feature = "_fuzzing")]
7pub mod fuzzing;
8
9use asynchronous_codec::Framed;
10pub use milter::{Error, Milter};
11
12use futures::{AsyncRead, AsyncWrite, Future, SinkExt, StreamExt};
13use miltr_common::{
14 actions::Action,
15 decoding::ClientCommand,
16 encoding::ServerMessage,
17 optneg::{Capability, OptNeg},
18};
19use miltr_utils::debug;
20#[cfg(feature = "tracing")]
21use tracing::instrument;
22
23pub(crate) use self::codec::MilterCodec;
24
25#[derive(Debug)]
27pub struct Server<'m, M: Milter> {
28 milter: &'m mut M,
29 codec: MilterCodec,
30 quit_on_abort: bool,
31}
32
33impl<'m, M: Milter> Server<'m, M> {
34 pub fn new(milter: &'m mut M, quit_on_abort: bool, max_buffer_size: usize) -> Self {
36 let codec = MilterCodec::new(max_buffer_size);
37 Self {
38 milter,
39 codec,
40 quit_on_abort,
41 }
42 }
43
44 #[must_use]
67 pub fn default_postfix(milter: &'m mut M) -> Self {
68 Self::new(milter, false, 2_usize.pow(16))
69 }
70
71 #[cfg_attr(feature = "tracing", instrument(skip_all))]
83 pub async fn handle_connection<RW: AsyncRead + AsyncWrite + Unpin + Send>(
84 &mut self,
85 socket: RW,
86 ) -> Result<(), Error<M::Error>> {
87 let mut framed = Framed::new(socket, &mut self.codec);
88
89 let mut options: Option<OptNeg> = Option::None;
90
91 while let Some(command) = framed.next().await {
92 let command = command?;
93 debug!("Received {}", command);
94
95 match command {
96 ClientCommand::Helo(helo) => {
98 Self::notify_respond_answer(self.milter.helo(helo), &mut framed).await?;
99 }
100 ClientCommand::Connect(connect) => {
101 Self::notify_respond_answer(self.milter.connect(connect), &mut framed).await?;
102 }
103 ClientCommand::Mail(mail) => {
104 Self::notify_respond_answer(self.milter.mail(mail), &mut framed).await?;
105 }
106 ClientCommand::Recipient(rcpt) => {
107 Self::notify_respond_answer(self.milter.rcpt(rcpt), &mut framed).await?;
108 }
109 ClientCommand::Data(_v) => {
110 Self::notify_respond_answer(self.milter.data(), &mut framed).await?;
111 }
112 ClientCommand::Header(header) => {
113 Self::notify_respond_answer(self.milter.header(header), &mut framed).await?;
114 }
115 ClientCommand::EndOfHeader(_v) => {
116 Self::notify_respond_answer(self.milter.end_of_header(), &mut framed).await?;
117 }
118 ClientCommand::Body(body) => {
119 Self::notify_respond_answer(self.milter.body(body), &mut framed).await?;
120 }
121 ClientCommand::Unknown(unknown) => {
122 Self::notify_respond_answer(self.milter.unknown(unknown), &mut framed).await?;
123 }
124 ClientCommand::EndOfBody(_v) => {
126 let mut responses = self
128 .milter
129 .end_of_body()
130 .await
131 .map_err(Error::from_app_error)?;
132
133 responses.filter_mods_by_caps(
136 options
137 .as_ref()
138 .map_or(Capability::all(), |o| o.capabilities),
139 );
140
141 let responses: Vec<ServerMessage> = responses.into();
143 for response in responses {
144 debug!("Sending response");
145 framed.send(&response).await?;
146 }
147 }
148 ClientCommand::Macro(macro_) => {
149 self.milter
150 .macro_(macro_)
151 .await
152 .map_err(Error::from_app_error)?;
153 }
154
155 ClientCommand::OptNeg(opt_neg) => {
158 let response = self.milter.option_negotiation(opt_neg).await?;
159 options = Some(response.clone());
160 framed.send(&response.into()).await?;
161 }
162 ClientCommand::Abort(_v) => {
164 self.milter.abort().await.map_err(Error::from_app_error)?;
165
166 if self.quit_on_abort {
167 self.milter.quit().await.map_err(Error::from_app_error)?;
168 return Ok(());
169 }
170 }
171 ClientCommand::Quit(_v) => {
173 self.milter.quit().await.map_err(Error::from_app_error)?;
174 return Ok(());
175 }
176 ClientCommand::QuitNc(_v) => {
178 self.milter.quit_nc().await.map_err(Error::from_app_error)?;
179 }
180 }
181 }
182 Ok(())
183 }
184
185 async fn notify_respond_answer<RW: AsyncRead + AsyncWrite + Unpin>(
187 milter_fn: impl Future<Output = Result<impl Into<Action>, M::Error>>,
188 framed: &mut Framed<RW, &mut MilterCodec>,
189 ) -> Result<(), milter::Error<M::Error>> {
190 let response = milter_fn.await.map_err(Error::from_app_error)?;
191 let response: Action = response.into();
192
193 framed.send(&response.into()).await?;
194 Ok(())
195 }
196}