1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::{io as std_io};
use bytes::{Buf, IntoBuf};
use futures::future::{self, Either, Future};
use futures::stream::{self, Stream};
use future_ext::ResultWithContextExt;
use ::{ExecFuture, Cmd, Io, EhloData};
use ::response::codes;
use ::error::{LogicError, MissingCapabilities};
pub struct Data<S> {
source: S
}
impl<BF> Data<stream::Once<BF, std_io::Error>>
where BF: Buf
{
pub fn from_buf<B: IntoBuf<Buf=BF>>(buf: B) -> Self {
Data::new(stream::once(Ok(buf.into_buf())))
}
}
impl<S> Data<S>
where S: Stream<Error=std_io::Error>, S::Item: Buf
{
pub fn new(source: S) -> Self {
Data { source }
}
}
impl<S: 'static> Cmd for Data<S>
where S: Stream<Error=std_io::Error> + Send, S::Item: Buf
{
fn check_cmd_availability(&self, _caps: Option<&EhloData>)
-> Result<(), MissingCapabilities>
{
Ok(())
}
fn exec(self, io: Io) -> ExecFuture {
let Data { source } = self;
let fut = io
.flush_line_from_parts(&["DATA"])
.and_then(Io::parse_response)
.ctx_and_then(move |io, response| {
if response.code() != codes::START_MAIL_DATA {
return Either::A(future::ok((io, Err(LogicError::UnexpectedCode(response)))));
}
let fut = io
.write_dot_stashed(source)
.and_then(Io::parse_response);
Either::B(fut)
});
Box::new(fut)
}
}