1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::{
common::*,
smtp::{ApplyCommand, CodecControl, MailBodyChunk, MailBodyEnd, SmtpSessionCommand, SmtpState},
};
use super::{ESMTPCommand, Rfc5321};
impl<B: AsRef<[u8]> + Sync + Send + fmt::Debug> SmtpSessionCommand
for ESMTPCommand<MailBodyChunk<B>>
{
fn verb(&self) -> &str {
""
}
fn apply(&self, state: SmtpState) -> S2Fut<SmtpState> {
Rfc5321::apply_cmd(&self.instruction, state)
}
}
impl<B: AsRef<[u8]> + Sync + Send + fmt::Debug> ApplyCommand<MailBodyChunk<B>> for Rfc5321 {
fn apply_cmd(data: &MailBodyChunk<B>, mut state: SmtpState) -> S2Fut<SmtpState> {
if state.transaction.sink.is_none() {
return Box::pin(ready(state));
}
let sink = state
.transaction
.sink
.take()
.expect("Checked presence above");
let mailid = state.transaction.id.clone();
let fut = async move {
let mut write_all = WriteAll {
from: data.0.as_ref(),
to: Box::pin(sink),
};
match (&mut write_all).await {
Ok(()) => {
let WriteAll { to, .. } = write_all;
state.transaction.sink = Some(to);
}
Err(e) => {
warn!("Failed to write mail data for {} - {}", mailid, e);
state.reset();
}
};
state
};
Box::pin(fut)
}
}
impl SmtpSessionCommand for ESMTPCommand<MailBodyEnd> {
fn verb(&self) -> &str {
""
}
fn apply(&self, state: SmtpState) -> S2Fut<SmtpState> {
Rfc5321::apply_cmd(&self.instruction, state)
}
}
impl ApplyCommand<MailBodyEnd> for Rfc5321 {
fn apply_cmd(_data: &MailBodyEnd, mut state: SmtpState) -> S2Fut<SmtpState> {
if state.transaction.sink.is_none() {
return Box::pin(ready(state));
}
let mut sink = state
.transaction
.sink
.take()
.expect("Checked presence above");
let mailid = state.transaction.id.clone();
let fut = async move {
if match poll_fn(move |cx| sink.as_mut().poll_close(cx)).await {
Ok(()) => true,
Err(e) if e.kind() == std::io::ErrorKind::NotConnected => true,
Err(e) => {
warn!("Failed to close mail {}: {}", mailid, e);
false
}
} {
state.say_mail_queued(mailid.as_str());
} else {
state.say_mail_queue_failed_temporarily();
}
state.reset();
state.say(CodecControl::Parser(
state.service.get_parser_for_commands(),
));
state
};
Box::pin(fut)
}
}
struct WriteAll<'a, W> {
pub from: &'a [u8],
pub to: Pin<Box<W>>,
}
impl<W> Future for WriteAll<'_, W>
where
W: Write,
{
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
while !this.from.is_empty() {
let n = match Pin::new(&mut this.to).poll_write(cx, this.from)? {
Poll::Pending => return Poll::Pending,
Poll::Ready(len) => len,
};
{
let (_, rest) = std::mem::replace(&mut this.from, &[]).split_at(n);
this.from = rest;
}
if n == 0 {
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
}
}
Poll::Ready(Ok(()))
}
}