1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::{
    common::*,
    smtp::{ApplyCommand, CodecControl, MailBodyChunk, MailBodyEnd, SmtpSessionCommand, SmtpState},
};

use super::{ESMTPCommand, Rfc5321};

impl<B: AsRef<[u8]> + Sync + Send + fmt::Debug> SmtpSessionCommand
    for ESMTPCommand<MailBodyChunk<B>>
{
    fn verb(&self) -> &str {
        ""
    }

    fn apply(&self, state: SmtpState) -> S2Fut<SmtpState> {
        Rfc5321::apply_cmd(&self.instruction, state)
    }
}

impl<B: AsRef<[u8]> + Sync + Send + fmt::Debug> ApplyCommand<MailBodyChunk<B>> for Rfc5321 {
    fn apply_cmd(data: &MailBodyChunk<B>, mut state: SmtpState) -> S2Fut<SmtpState> {
        if state.transaction.sink.is_none() {
            // CheckMe: silence. handle_data_end should respond with error.
            return Box::pin(ready(state));
        }
        let sink = state
            .transaction
            .sink
            .take()
            .expect("Checked presence above");
        let mailid = state.transaction.id.clone();
        let fut = async move {
            let mut write_all = WriteAll {
                from: data.0.as_ref(),
                to: Box::pin(sink),
            };
            match (&mut write_all).await {
                Ok(()) => {
                    let WriteAll { to, .. } = write_all;
                    state.transaction.sink = Some(to);
                }
                Err(e) => {
                    warn!("Failed to write mail data for {} - {}", mailid, e);
                    state.reset();
                    // CheckMe: following this reset, we are not sending any response yet. MailBodyEnd should do that.
                }
            };
            state
        };
        Box::pin(fut)
    }
}

impl SmtpSessionCommand for ESMTPCommand<MailBodyEnd> {
    fn verb(&self) -> &str {
        ""
    }
    fn apply(&self, state: SmtpState) -> S2Fut<SmtpState> {
        Rfc5321::apply_cmd(&self.instruction, state)
    }
}

impl ApplyCommand<MailBodyEnd> for Rfc5321 {
    fn apply_cmd(_data: &MailBodyEnd, mut state: SmtpState) -> S2Fut<SmtpState> {
        if state.transaction.sink.is_none() {
            // CheckMe: silence. handle_data_end should respond with error.
            return Box::pin(ready(state));
        }
        let mut sink = state
            .transaction
            .sink
            .take()
            .expect("Checked presence above");
        let mailid = state.transaction.id.clone();
        let fut = async move {
            if match poll_fn(move |cx| sink.as_mut().poll_close(cx)).await {
                Ok(()) => true,
                Err(e) if e.kind() == std::io::ErrorKind::NotConnected => true,
                Err(e) => {
                    warn!("Failed to close mail {}: {}", mailid, e);
                    false
                }
            } {
                state.say_mail_queued(mailid.as_str());
            } else {
                state.say_mail_queue_failed_temporarily();
            }
            state.reset();
            state.say(CodecControl::Parser(
                state.service.get_parser_for_commands(),
            ));
            state
        };
        Box::pin(fut)
    }
}

struct WriteAll<'a, W> {
    pub from: &'a [u8],
    pub to: Pin<Box<W>>,
}

impl<W> Future for WriteAll<'_, W>
where
    W: Write,
{
    type Output = std::io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = &mut *self;
        while !this.from.is_empty() {
            let n = match Pin::new(&mut this.to).poll_write(cx, this.from)? {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(len) => len,
            };
            {
                let (_, rest) = std::mem::replace(&mut this.from, &[]).split_at(n);
                this.from = rest;
            }
            if n == 0 {
                return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
            }
        }

        Poll::Ready(Ok(()))
    }
}