miltr_client/
lib.rs

1#![doc = include_str!("../Readme.md")]
2
3mod codec;
4
5#[cfg(feature = "_fuzzing")]
6pub mod fuzzing;
7
8use std::{ops::Deref, sync::Arc};
9
10use asynchronous_codec::Framed;
11use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
12use miltr_utils::debug;
13use paste::paste;
14use thiserror::Error;
15#[cfg(feature = "tracing")]
16use tracing::{instrument, Level};
17
18use miltr_common::{
19    actions::{Abort, Action, Quit},
20    commands::{
21        Body, Command, Connect, Data, EndOfBody, EndOfHeader, Header, Helo, Mail, Recipient,
22        Unknown,
23    },
24    decoding::ServerCommand,
25    modifications::{ModificationAction, ModificationResponse},
26    optneg::{CompatibilityError, OptNeg},
27    ProtocolError,
28};
29
30use self::codec::MilterCodec;
31
32/// A milter client using some options and a codec to talk to a milter server
33pub struct Client {
34    options: Arc<OptNeg>,
35    codec: MilterCodec,
36}
37
38/// A single milter connection
39///
40/// This can be created by calling [`Client::connect_via`] to establish
41/// a milter session.
42///
43/// A regular session could use these commands in order:
44///
45/// - [`Connection::connect`]
46/// - [`Connection::helo`]
47/// - [`Connection::mail`]
48/// - [`Connection::recipient`]
49/// - [`Connection::data`]
50/// - [`Connection::header`] (multiple)
51/// - [`Connection::end_of_header`]
52/// - [`Connection::body`] (multiple)
53/// - [`Connection::end_of_body`]
54///
55/// Be careful about the ordering of these commands, milter implementations
56/// are designed to expect them in order they appear in the SMTP protocol.
57///
58/// # Protocol from `OptNeg`
59///
60/// Depending on what was set by client and server during option negotiation
61/// when establishing the connection, commands might either not be sent at all
62/// or no response is awaited.
63///
64/// Assuming [`Protocol::NO_HELO`](miltr_common::optneg::Protocol::NO_HELO) is
65/// set during option negotiation, calling [`Connection::helo`] short-circuits
66/// to `return Ok(())`.
67///
68/// If [`Protocol::NR_HELO`](miltr_common::optneg::Protocol::NR_HELO) is set,
69/// calling [`Connection::helo`] does not wait for an answer from the milter
70/// server, it immediately `return Ok(())` after sending the command.
71///
72/// Commands behave differently here, see the implementations for
73/// [`Protocol::skip_send`](miltr_common::optneg::Protocol::should_skip_send) and
74/// [`Protocol::skip_response`](miltr_common::optneg::Protocol::should_skip_response)
75/// for details.
76pub struct Connection<RW: AsyncRead + AsyncWrite + Unpin> {
77    framed: Framed<RW, MilterCodec>,
78    options: OptNeg,
79}
80
81impl Client {
82    /// Create a client which is able to handle connections with the provided
83    /// options.
84    #[must_use]
85    pub fn new(options: OptNeg) -> Self {
86        let codec = MilterCodec::new(2_usize.pow(16));
87
88        Self {
89            options: Arc::new(options),
90            codec,
91        }
92    }
93
94    /// Option negotiate with the server
95    ///
96    /// The steps are:
97    /// 1. Send our options to the server
98    /// 2. Receive it's options back
99    /// 3. Merge them into one
100    async fn recv_option_negotiation<RW: AsyncRead + AsyncWrite + Unpin>(
101        &self,
102        framed: &mut Framed<RW, MilterCodec>,
103    ) -> Result<OptNeg, ResponseError> {
104        let client_options = &self.options;
105        framed.send(&client_options.deref().clone().into()).await?;
106
107        let resp = framed
108            .next()
109            .await
110            .ok_or(ResponseError::MissingServerResponse)??;
111
112        let server_options = match resp {
113            ServerCommand::OptNeg(optneg) => Ok(optneg),
114            command => Err(ResponseError::Unexpected(command)),
115        }?;
116
117        let options = server_options.merge_compatible(&self.options)?;
118
119        Ok(options)
120    }
121
122    /// Handle a single milter connection via the provided RW connection
123    ///
124    /// # Errors
125    /// This fails if an io-error is experienced or option negotiation fails
126    pub async fn connect_via<RW: AsyncRead + AsyncWrite + Unpin>(
127        &self,
128        connection: RW,
129    ) -> Result<Connection<RW>, ResponseError> {
130        let codec = self.codec.clone();
131        let mut framed = Framed::new(connection, codec);
132        let options = self.recv_option_negotiation(&mut framed).await?;
133
134        let connection = Connection { framed, options };
135
136        Ok(connection)
137    }
138}
139
140macro_rules! command {
141    (
142        $(#[$outer:meta])*
143        (into) $variant:ident
144    ) => {
145        paste! {
146            $(#[$outer])*
147            pub async fn [<$variant:snake>]<C: Into<[<$variant:camel>]>>(&mut self, command: C) -> Result<(), ResponseError> {
148                let command_intoed: [<$variant:camel>] = command.into();
149                let command: Command = command_intoed.into();
150
151                self.send_command(command).await
152            }
153        }
154    };
155    (
156        $(#[$outer:meta])*
157        (new) $variant:ident
158    ) => {
159        paste! {
160            $(#[$outer])*
161            pub async fn [<$variant:snake>](&mut self) -> Result<(), ResponseError> {
162                let command: Command = [<$variant:camel>].into();
163
164                self.send_command(command).await
165            }
166        }
167    };
168}
169
170impl<RW: AsyncRead + AsyncWrite + Unpin> Connection<RW> {
171    command!(
172        /// Send connect information.
173        ///
174        ///
175        /// # Errors
176        /// Errors on any response from the milter server that is not Continue
177        (into) Connect
178    );
179
180    command!(
181        /// Handle a client helo
182        ///
183        ///
184        /// # Errors
185        /// Errors on any response from the milter server that is not Continue
186        (into) Helo
187    );
188
189    command!(
190        /// Send the sender info
191        ///
192        ///
193        /// # Errors
194        /// Errors on any response from the milter server that is not Continue
195        (into) Mail
196    );
197
198    command!(
199        /// Send the recipient info
200        ///
201        ///
202        /// # Errors
203        /// Errors on any response from the milter server that is not Continue
204        (into) Recipient
205    );
206
207    command!(
208        /// Indicate that data follows
209        ///
210        ///
211        /// # Errors
212        /// Errors on any response from the milter server that is not Continue
213        (new) Data
214    );
215
216    command!(
217        /// Send headers
218        ///
219        ///
220        /// # Errors
221        /// Errors on any response from the milter server that is not Continue
222        (into) Header
223    );
224
225    command!(
226        /// Indicate all headers have been sent
227        ///
228        ///
229        /// # Errors
230        /// Errors on any response from the milter server that is not Continue
231        (new) EndOfHeader
232    );
233
234    command!(
235        /// Send a body part
236        ///
237        ///
238        /// # Errors
239        /// Errors on any response from the milter server that is not Continue
240        (into) Body
241    );
242
243    // command!(
244    //     /// Indicate all body parts have been sent
245    //     ///
246    //     /// # Errors
247    //     /// Errors on any response from the milter server that is not Continue
248    //     (new) EndOfBody
249    // );
250
251    /// Indicate all body parts have been sent
252    ///
253    /// # Errors
254    /// Errors on any response from the milter server that is not Continue
255    pub async fn end_of_body(&mut self) -> Result<ModificationResponse, ResponseError> {
256        // First, send the eob command
257        let command: Command = EndOfBody.into();
258        self.framed.send(&command.into()).await?;
259
260        let mut modification_response_builder = ModificationResponse::builder();
261        loop {
262            // Receive a response from the server
263            let answer = self.receive_answer().await?;
264
265            // Convert it to a command type
266            let command: CommandType = answer.try_into()?;
267
268            match command {
269                CommandType::Action(action) => {
270                    return Ok(modification_response_builder.build(action));
271                }
272                CommandType::ModificationAction(action) => {
273                    modification_response_builder.push(action);
274                }
275            }
276        }
277    }
278
279    /// Receive all modification requests from the server
280    ///
281    /// # Errors
282    /// Errors on error regarding server communication
283    pub async fn modification(&mut self) -> Result<CommandType, ResponseError> {
284        let resp = self.receive_answer().await?;
285
286        CommandType::try_from(resp)
287    }
288
289    /// Ask for a graceful connection shutdown
290    ///
291    /// # Errors
292    /// Errors on io or codec Errors
293    pub async fn quit(mut self) -> Result<(), ProtocolError> {
294        self.framed.send(&Action::Quit(Quit).into()).await?;
295
296        Ok(())
297    }
298
299    /// Ask to re-use this connection for a new mail
300    ///
301    /// # Errors
302    /// Errors on any response from the milter server that is not Continue
303    pub fn quit_nc(self) -> Result<(), ProtocolError> {
304        todo!("Quit_NC Not yet implemented")
305    }
306
307    /// Abort processing for the current mail
308    ///
309    /// # Errors
310    /// Errors on io or codec Errors
311    pub async fn abort(mut self) -> Result<(), ProtocolError> {
312        self.framed.send(&Action::from(Abort).into()).await?;
313
314        Ok(())
315    }
316
317    command!(
318        /// Send an unknown command to the server.
319        ///
320        ///
321        /// # Errors
322        /// Errors on io or codec Errors
323        (into) Unknown
324    );
325
326    /// Send a command to the server respecting protocol settings
327    #[cfg_attr(feature = "tracing", instrument(level = Level::DEBUG, skip(self), fields(%command), err))]
328    async fn send_command(&mut self, command: Command) -> Result<(), ResponseError> {
329        // Eval skips
330        if self.options.protocol.should_skip_send(&command) {
331            debug!("Skip sending");
332            return Ok(());
333        }
334        let skip_response = self.options.protocol.should_skip_response(&command);
335
336        // Send it
337        debug!("Sending command");
338        self.framed.send(&command.into()).await?;
339
340        // Check response
341        if skip_response {
342            debug!("Skip receiving response");
343            return Ok(());
344        }
345        self.expect_continue().await
346    }
347
348    /// Shortcut to fetch an answer from the server
349    async fn receive_answer(&mut self) -> Result<ServerCommand, ResponseError> {
350        let resp = self
351            .framed
352            .next()
353            .await
354            .ok_or(ResponseError::MissingServerResponse)??;
355
356        Ok(resp)
357    }
358    /// Shortcut expect a Continue answer from the server
359    async fn expect_continue(&mut self) -> Result<(), ResponseError> {
360        // Receive back answer
361        let resp = self.receive_answer().await?;
362
363        // If continue, just continue. Otherwise return an error
364        match resp {
365            ServerCommand::Continue(_c) => Ok(()),
366            command => Err(ResponseError::Unexpected(command)),
367        }
368    }
369}
370
371/// An error for all problems the client could experience
372#[derive(Debug, Error)]
373pub enum ResponseError {
374    /// Anything protocol related
375    #[error(transparent)]
376    ProtocolError(#[from] ProtocolError),
377    /// If there should have been a response
378    #[error("Server did not respond to a query")]
379    MissingServerResponse,
380    /// If there was a response but it was the wrong one
381    #[error("Server respond with an unexpected answer")]
382    Unexpected(ServerCommand),
383    /// If we have a protocol compatibility issue
384    #[error(transparent)]
385    CompatibilityError(#[from] CompatibilityError),
386}
387
388/// The types of commands the server may respond with
389pub enum CommandType {
390    /// A regular control flow action
391    Action(Action),
392    /// A data modification action
393    ModificationAction(ModificationAction),
394}
395
396impl TryFrom<ServerCommand> for CommandType {
397    type Error = ResponseError;
398
399    fn try_from(value: ServerCommand) -> Result<Self, Self::Error> {
400        match value {
401            ServerCommand::OptNeg(value) => Err(ResponseError::Unexpected(value.into())),
402            ServerCommand::Abort(value) => Ok(Self::Action(value.into())),
403            ServerCommand::Continue(value) => Ok(Self::Action(value.into())),
404            ServerCommand::Discard(value) => Ok(Self::Action(value.into())),
405            ServerCommand::Reject(value) => Ok(Self::Action(value.into())),
406            ServerCommand::Tempfail(value) => Ok(Self::Action(value.into())),
407            ServerCommand::Skip(value) => Ok(Self::Action(value.into())),
408            ServerCommand::Replycode(value) => Ok(Self::Action(value.into())),
409            ServerCommand::AddRecipient(value) => Ok(Self::ModificationAction(value.into())),
410            ServerCommand::DeleteRecipient(value) => Ok(Self::ModificationAction(value.into())),
411            ServerCommand::ReplaceBody(value) => Ok(Self::ModificationAction(value.into())),
412            ServerCommand::AddHeader(value) => Ok(Self::ModificationAction(value.into())),
413            ServerCommand::InsertHeader(value) => Ok(Self::ModificationAction(value.into())),
414            ServerCommand::ChangeHeader(value) => Ok(Self::ModificationAction(value.into())),
415            ServerCommand::Quarantine(value) => Ok(Self::ModificationAction(value.into())),
416        }
417    }
418}