1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::io as std_io;
use bytes::{Buf, IntoBuf};
use futures::{
future::{self, Either, Future},
stream::{self, Stream},
};
use crate::{
error::{LogicError, MissingCapabilities},
future_ext::ResultWithContextExt,
response::codes,
Cmd, EhloData, ExecFuture, Io,
};
pub struct Data<S> {
source: S,
}
impl<BF> Data<stream::Once<BF, std_io::Error>>
where
BF: Buf,
{
pub fn from_buf<B: IntoBuf<Buf = BF>>(buf: B) -> Self {
Data::new(stream::once(Ok(buf.into_buf())))
}
}
impl<S> Data<S>
where
S: Stream<Error = std_io::Error>,
S::Item: Buf,
{
pub fn new(source: S) -> Self {
Data { source }
}
}
impl<S: 'static> Cmd for Data<S>
where
S: Stream<Error = std_io::Error> + Send,
S::Item: Buf,
{
fn check_cmd_availability(&self, _caps: Option<&EhloData>) -> Result<(), MissingCapabilities> {
Ok(())
}
fn exec(self, io: Io) -> ExecFuture {
let Data { source } = self;
let fut = io
.flush_line_from_parts(&["DATA"])
.and_then(Io::parse_response)
.ctx_and_then(move |io, response| {
if response.code() != codes::START_MAIL_DATA {
return Either::A(future::ok((io, Err(LogicError::UnexpectedCode(response)))));
}
let fut = io.write_dot_stashed(source).and_then(Io::parse_response);
Either::B(fut)
});
Box::new(fut)
}
}