1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! Provides the `smtp_chain` macro and the `chain` function
//!
//! see their respective documentation for more information.
use futures::future::{self, Either, Future, Loop};
use std::io as std_io;
use std::sync::Arc;

use crate::{command, error::LogicError, BoxedCmd, Connection};

/// creates a chain of commands and them to the given connection
///
/// This will call `boxed()` on every command, while puting them
/// into a vector which is then passed to `cain`.
///
/// # Example
///
/// ```no_run
/// # extern crate futures;
/// # #[macro_use] extern crate new_tokio_smtp;
/// use futures::future::{self, Future};
/// use new_tokio_smtp::chain::OnError;
/// use new_tokio_smtp::{command, Connection, ReversePath, ForwardPath};
///
///
/// let fut = future
///     ::lazy(|| mock_create_connection())
///     .and_then(|con| smtp_chain!(con with OnError::StopAndReset => [
///         command::Mail::new(
///             ReversePath::from_unchecked("test@sender.test")),
///         command::Recipient::new(
///             ForwardPath::from_unchecked("test@receiver.test")),
///         command::Data::from_buf(concat!(
///             "Date: Thu, 14 Jun 2018 11:22:18 +0000\r\n",
///             "From: Sendu <test@sender.test>\r\n",
///             "\r\n",
///             "...\r\n"
///         ))
///     ]))
///     .and_then(|(con, smtp_chain_result)| {
///         if let Err((at_idx, err)) = smtp_chain_result {
///             println!("server says no on the cmd with index {}: {}", at_idx, err)
///         }
///         con.quit()
///     });
///
/// // ... this are tokio using operation make sure there is
/// //     a running tokio instance/runtime/event loop
/// mock_run_with_tokio(fut);
///
/// # // some mock-up, for this example to compile
/// # fn mock_create_connection() -> Result<Connection, ::std::io::Error>
/// #  { unimplemented!() }
/// # fn mock_run_with_tokio(f: impl Future) { unimplemented!() }
///
/// ```
#[macro_export]
macro_rules! smtp_chain {
    ($con:ident with $oer:expr => [$($cmd:expr),*]) => ({
        use $crate::Cmd;
        $crate::chain::chain($con, vec![$($cmd.boxed()),*], $oer)
    });
}

/// implement this trait for custom error in chain handling
///
/// e.g. a smtp allows failing some of the `RCPT` command in
/// a single mail transaction. The default handling won't allow
/// this but a custom implementation could.
pub trait HandleErrorInChain: Send + Sync + 'static {
    type Fut: Future<Item = (Connection, bool), Error = std_io::Error> + Send;

    /// handle the error on given connection for the `msg_idx`-ed command
    /// given the given logic error
    fn handle_error(&self, con: Connection, msg_idx: usize, logic_error: &LogicError) -> Self::Fut;
}

/// send all commands in `chain` through the given connection one
/// after another
pub fn chain<H>(
    con: Connection,
    chain: Vec<BoxedCmd>,
    on_error: H,
) -> impl Future<Item = (Connection, Result<(), (usize, LogicError)>), Error = std_io::Error> + Send
where
    H: HandleErrorInChain,
{
    let _on_error = Arc::new(on_error);
    let mut chain = chain;
    //stackify
    chain.reverse();

    // the index of the current operation in the chain plus 1
    let mut index_p1 = 0;
    let fut = future::loop_fn(con, move |con| {
        index_p1 += 1;
        if let Some(next_cmd) = chain.pop() {
            //FIXME[rust/co-rotines+self-borrow]: this is likly not needed with self borrow
            let on_error = _on_error.clone();
            let fut = con
                .send(next_cmd)
                .and_then(move |(con, result)| match result {
                    Ok(_result) => Either::A(future::ok(Loop::Continue(con))),
                    Err(err) => {
                        let index = index_p1 - 1;
                        let fut =
                            on_error
                                .handle_error(con, index, &err)
                                .map(move |(con, stop)| {
                                    if stop {
                                        Loop::Break((con, Err((index, err))))
                                    } else {
                                        Loop::Continue(con)
                                    }
                                });
                        Either::B(fut)
                    }
                });

            Either::A(fut)
        } else {
            Either::B(future::ok(Loop::Break((con, Ok(())))))
        }
    });

    fut
}

/// Decide if a error should just stop sending commands or should
/// also trigger the sending of `RSET` stopping the current mail
/// transaction
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum OnError {
    StopAndReset,
    Stop,
}

impl HandleErrorInChain for OnError {
    //FIXME[rust/impl Trait for associated type]: use impl Trait/abstract type
    type Fut = Box<dyn Future<Item = (Connection, bool), Error = std_io::Error> + Send>;
    //type Fut = impl Future<Item=(Connection, bool), Error=std_io::Error> + Send;

    fn handle_error(&self, con: Connection, _msg_idx: usize, _error: &LogicError) -> Self::Fut {
        let fut = match *self {
            OnError::Stop => Either::A(future::ok((con, true))),
            OnError::StopAndReset => {
                let fut = con
                    .send(command::Reset)
                    //Note: Reset wont reach (con, Err(_)), ever! a reset error is turned
                    // into a io::Error
                    .map(|(con, _)| (con, true));

                Either::B(fut)
            }
        };

        Box::new(fut)
    }
}