1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::{io as std_io};

use bytes::{Buf, IntoBuf};
use futures::future::{self, Either, Future};
use futures::stream::{self, Stream};
use future_ext::ResultWithContextExt;

use ::{ExecFuture, Cmd, Io, EhloData};
use ::response::codes;
use ::error::{LogicError, MissingCapabilities};


pub struct Data<S> {
    //TODO add parameter support
    source: S
}

impl<BF> Data<stream::Once<BF, std_io::Error>>
    where BF: Buf
{
    pub fn from_buf<B: IntoBuf<Buf=BF>>(buf: B) -> Self {
        Data::new(stream::once(Ok(buf.into_buf())))
    }
}

impl<S> Data<S>
    where S: Stream<Error=std_io::Error>, S::Item: Buf
{
    pub fn new(source: S) -> Self {
        Data { source }
    }
}

impl<S: 'static> Cmd for Data<S>
    where S: Stream<Error=std_io::Error> + Send, S::Item: Buf
{

    fn check_cmd_availability(&self, _caps: Option<&EhloData>)
        -> Result<(), MissingCapabilities>
    {
        Ok(())
    }

    fn exec(self, io: Io) -> ExecFuture {
        let Data { source } = self;

        let fut = io
            .flush_line_from_parts(&["DATA"])
            .and_then(Io::parse_response)
            .ctx_and_then(move |io, response| {
                if response.code() != codes::START_MAIL_DATA {
                    return Either::A(future::ok((io, Err(LogicError::UnexpectedCode(response)))));
                }

                let fut = io
                    .write_dot_stashed(source)
                    .and_then(Io::parse_response);

                Either::B(fut)
            });

        Box::new(fut)
    }

}