new-tokio-smtp 0.9.1

extendible smtp implementation for tokio
Documentation
//! provides a `MockStream` implementations
use std::cmp::min;
use std::io::{self as std_io, Read, Write};
use std::mem;
use std::thread;
use std::time::Duration;

use rand::{random, thread_rng, Rng};

use bytes::{BufMut, BytesMut};
use futures::sync::mpsc;
use futures::task::{self, Task};
use futures::{future, Async, Future, Poll, Stream};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::io::MockStream;

/// Represents if the action is taken by `Client` or `Server`
#[derive(Debug)]
pub enum Actor {
    Server,
    Client,
}

/// the data send by Client/Server
#[derive(Debug)]
pub enum ActionData {
    /// a number of lines, not containing trailing "\r\n"
    ///
    /// The trailing "\r\n" will be added implicitly
    Lines(Vec<&'static str>),
    /// A blob of bytes
    Blob(Vec<u8>),
}

impl ActionData {
    /// returns the len of the data
    ///
    /// In case of `ActionData::Lines` the implied `"\r\n"` line
    /// endings are added into the length (i.e. len +2 for each line).
    pub fn len(&self) -> usize {
        match self {
            ActionData::Blob(blob) => blob.len(),
            ActionData::Lines(lines) => {
                //MAGIC_NUM: +2 = "\r\n".len()
                lines.iter().map(|ln| ln.len() + 2).sum()
            }
        }
    }

    pub fn assert_same_start(&self, other: &[u8]) {
        match self {
            ActionData::Blob(blob) => {
                let use_len = min(blob.len(), other.len());
                let other = &other[..use_len];
                let blob = &blob[..use_len];
                //TODO better error message (assert_eq is a BAD idea here as
                // it will flood the output)
                if blob != other {
                    let blob = String::from_utf8_lossy(blob);
                    let other = String::from_utf8_lossy(other);
                    dbg!(blob);
                    dbg!(other);
                    panic!("unexpected data");
                }
            }
            ActionData::Lines(lines) => {
                let mut rem = other;
                for line in lines.iter() {
                    let use_len = min(line.len(), rem.len());
                    let use_of_line = &line[..use_len];
                    let other = &rem[..use_len];
                    //TODO better error message (assert_eq is a BAD idea here as
                    // it will flood the output)
                    assert!(use_of_line.as_bytes() == other, "unexpected data");

                    if use_len < line.len() {
                        // we need more data => brake
                        break;
                    }

                    //check the "\r\n" omitted in Lines
                    rem = check_crlf_start(&rem[use_len..]);
                }
            }
        }
    }
}

fn check_crlf_start(tail: &[u8]) -> &[u8] {
    let mut tail = tail;
    let length = tail.len();
    if length >= 1 {
        assert!(
            tail[0] == b'\r',
            "unexpected data, expected '\\r' got {:?} in {}",
            tail[0] as char,
            String::from_utf8_lossy(tail)
        );
        tail = &tail[1..];
    }
    if length >= 2 {
        assert!(
            tail[0] == b'\n',
            "unexpected data, expected '\\n' got {:?}in {}",
            tail[0] as char,
            String::from_utf8_lossy(tail)
        );
        tail = &tail[1..];
    }

    tail
}

type Waker = mpsc::UnboundedSender<Task>;

#[derive(Debug)]
enum State {
    ServerIsWorking {
        waker: Waker,
        to_be_read: BytesMut,
    },
    ClientIsWorking {
        expected: ActionData,
        waker: Waker,
        input: BytesMut,
    },
    NeedNewAction {
        waker: Waker,
        buffer: BytesMut,
    },
    ShutdownOrPoison,
}

impl State {
    fn waker(&self) -> &Waker {
        match self {
            State::ServerIsWorking { waker, .. } => waker,
            State::ClientIsWorking { waker, .. } => waker,
            State::NeedNewAction { waker, .. } => waker,
            _ => panic!("trying to schedule wake up on shutdown stream"),
        }
    }
}

#[derive(Debug)]
pub struct MockSocket {
    conversation: Vec<(Actor, ActionData)>,
    fake_secure: bool,
    state: State,
    check_shutdown: bool,
}

/// MockSocket going through a pre-coded interlocked client-server conversation
///
/// The `client` is the part of the program reading to the socked using `poll_read`
/// and writing using `poll_write`, the server is the mock doing thinks in reserve,
/// i.e. reading when the client writes and writing when the server reads.
///
/// Internally it has following states:
///
/// - `ShutdownOrPoison`, it was shutdown or paniced at some point
/// - `ClientIsWorking`, the client is sending data and the server checks if it is
///   what it expects
/// - `ServerIsWorking`, the server sends back an pre-coded response
/// - `NeedNewAction`, the previous action was completed and a new one is needed
///
impl MockSocket {
    pub fn new(conversation: Vec<(Actor, ActionData)>) -> Self {
        Self::new_with_params(conversation, true)
    }

    pub fn new_no_check_shutdown(conversation: Vec<(Actor, ActionData)>) -> Self {
        Self::new_with_params(conversation, false)
    }

    /// create a new `MockSocket` from a sequence of "actions"
    ///
    /// Actions are taken interlocked between `Client` (client write something, server reads)
    /// and `Server` (server writes something, client reads), which is one of the main
    /// limitations of the Mock implementation.
    pub fn new_with_params(conversation: Vec<(Actor, ActionData)>, check_shutdown: bool) -> Self {
        let mut conversation = conversation;
        //queue => stack
        conversation.reverse();

        MockSocket {
            conversation,
            check_shutdown,
            fake_secure: false,
            state: State::NeedNewAction {
                buffer: BytesMut::new(),
                waker: delayed_waker(),
            },
        }
    }

    /// sets the state to `ShutdownOrPoison` and clears the conversation
    pub fn clear(&mut self) {
        self.conversation.clear();
        self.state = State::ShutdownOrPoison;
    }

    fn schedule_delayed_wake(&mut self) {
        self.state.waker().unbounded_send(task::current()).unwrap()
    }

    /// has a 1/16 chance to return `NotReady` and schedule the current `Task` to be notified later
    ///
    /// This is used to emulate that the connection is sometimes not ready jet
    /// e.g. because of network latencies. Yes, this makes the tests not 100% deterministic,
    /// but to get them in that direction and still test delays without hand encoding them
    /// would requires using something similar to `quick check`
    pub fn maybe_inject_not_ready(&mut self) -> Poll<(), std_io::Error> {
        // 1/16 chance to be not ready
        if random::<u8>() >= 124 {
            self.schedule_delayed_wake();
            Ok(Async::NotReady)
        } else {
            Ok(Async::Ready(()))
        }
    }

    /// creates the next state for given `waker` and `buffer`
    ///
    /// pop's the next action in the conversation if it's
    /// a `Server` action the returned state will be and
    /// `ServerIsWorking` state and the data of the action
    /// was fully written to the `buffer`. If it's a `Client`
    /// action a `ClientIsWorking` stat is returned.
    ///
    /// # Panics
    ///
    /// - if the conversation is done, i.e. if it is empty
    /// - the next state is a `Server` state and the passed in
    ///   buffer is not empty
    ///
    fn prepare_next(&mut self, waker: Waker, buffer: BytesMut) -> State {
        let (actor, data) = self
            .conversation
            .pop()
            .expect("prepare next on empty conversation");

        let mut buffer = buffer;

        match actor {
            Actor::Server => {
                // 1. data into() buffer
                assert!(
                    buffer.is_empty(),
                    "buffer had remaining input: {:?}",
                    String::from_utf8_lossy(buffer.as_ref())
                );
                buffer.reserve(data.len());
                match data {
                    ActionData::Lines(lines) => {
                        for line in lines {
                            buffer.put(line);
                            buffer.put("\r\n");
                        }
                    }
                    ActionData::Blob(blob) => {
                        buffer.put(blob);
                    }
                }
                State::ServerIsWorking {
                    waker,
                    to_be_read: buffer,
                }
            }
            Actor::Client => {
                // 1. clear buffer / reserve space in buffer
                State::ClientIsWorking {
                    expected: data,
                    waker,
                    input: buffer,
                }
            }
        }
    }
}

impl Drop for MockSocket {
    /// `drop` impl
    ///
    /// # Implementation Detail
    ///
    /// if the thread is not panicking it will panic:
    /// - if the socket was not shutdown
    /// - if the conversation did not end, i.e. was not empty
    fn drop(&mut self) {
        if !thread::panicking() {
            if self.check_shutdown {
                if let State::ShutdownOrPoison = self.state {
                } else {
                    panic!("connection was not shutdown");
                }
            }

            assert!(
                self.conversation.is_empty(),
                "premature cancellation of conversation"
            );
        }
    }
}

impl From<MockSocket> for crate::io::Socket {
    fn from(s: MockSocket) -> Self {
        crate::io::Socket::Mock(Box::new(s))
    }
}

impl From<MockSocket> for crate::io::Io {
    fn from(s: MockSocket) -> Self {
        let socket: crate::io::Socket = s.into();
        crate::io::Io::from((socket, crate::io::Buffers::new()))
    }
}

impl MockStream for MockSocket {
    fn is_secure(&self) -> bool {
        self.fake_secure
    }

    fn set_is_secure(&mut self, secure: bool) {
        self.fake_secure = secure;
    }
}

macro_rules! try_ready_or_would_block {
    ($expr:expr) => {{
        let res = $expr;
        match res {
            Ok(Async::Ready(t)) => t,
            Ok(Async::NotReady) => {
                return Err(std_io::Error::new(
                    std_io::ErrorKind::WouldBlock,
                    "Async::NotReady",
                ));
            }
            Err(err) => {
                return Err(err);
            }
        }
    }};
}

impl Read for MockSocket {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std_io::Error> {
        Ok(try_ready_or_would_block!(self.poll_read(buf)))
    }
}

impl Write for MockSocket {
    fn write(&mut self, buf: &[u8]) -> Result<usize, std_io::Error> {
        Ok(try_ready_or_would_block!(self.poll_write(buf)))
    }

    fn flush(&mut self) -> Result<(), std_io::Error> {
        Ok(try_ready_or_would_block!(self.poll_flush()))
    }
}

// before read/write:
//   read/write --> state == NeedNewAction --> prepare next action
//                                               \-> on no next action
//                                                      |-> on read -> NotReady*
//                                                      \-> on write -> panic
//
// [*]: the client might call read until it read all "ready" data, even if the
//      read data already did contain all data it needs, so we can not panic here
//      through it might life lock the client in other situations, so we need to
//      build in a timeout into all tests
//
// on read:
//   read --> Actor == Server -> part of  buffer into read -> return bytes transmitted
//        |                                 \-> state = NeedNewAction
//        |
//        \-> Actor == Client ->  panic
//
// on write:
//   write --> Actor == Client -> read from buffer -> return ...
//         |                          \-> if "end condition"
//         |                                \-> assert read input == expected input
//         |                                       \-> state = NeedNewAction
//         |
//         \-> Actor == Server -> panic
//
// "end condition"
//    1st: read length >= expected read length
//    2nd: alt condition "\r\n.\r\n" read?
//
// inject NotReady return:
//   on before read
//   on read after transmitting >= 1 byte
//   on before write
//   on write after transmitting >= 1 byte
//
// on NotReady return:
//   send Task to DelayedWakerThread

impl AsyncRead for MockSocket {
    /// `poll_read` impl
    ///
    /// # Implementation Details
    ///
    /// - Can always return with `NotReady` before doing anything.
    /// - panics if the state is `ClientIsWorking` or `ShutdownOrPoison`
    /// - on `NeedNewAction` it advances the state to the next action if
    ///   there is any and returns `NotReady`
    /// - writes a random amount of bytes to the passed in read buffer
    ///   (at last 1), advancing the state to `NeedNewAction` once all bytes
    ///   have been read
    fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> {
        try_ready!(self.maybe_inject_not_ready());
        let state = mem::replace(&mut self.state, State::ShutdownOrPoison);
        match state {
            State::ShutdownOrPoison => panic!("tried reading from shutdown/poisoned stream"),
            State::ClientIsWorking { .. } => {
                panic!("tried to read from socket while it should only write to it")
            }
            State::NeedNewAction { waker, buffer } => {
                if self.conversation.is_empty() {
                    self.state = State::NeedNewAction { waker, buffer };
                } else {
                    self.state = self.prepare_next(waker, buffer);
                }
                self.schedule_delayed_wake();
                Ok(Async::NotReady)
            }
            State::ServerIsWorking {
                waker,
                mut to_be_read,
            } => {
                let rem = to_be_read.len();
                let can_write = buf.len();
                let should_write = random_amount(min(rem, can_write));

                write_n_to_slice(&to_be_read, buf, should_write);
                to_be_read.advance(should_write);

                if to_be_read.is_empty() {
                    self.state = State::NeedNewAction {
                        waker,
                        buffer: to_be_read,
                    }
                } else {
                    self.state = State::ServerIsWorking { waker, to_be_read }
                }
                Ok(Async::Ready(should_write))
            }
        }
    }
}

impl AsyncWrite for MockSocket {
    /// `poll_write` impl
    ///
    /// # Implementation Details
    ///
    /// - Can always return with `NotReady` before doing anything.
    /// - panics if the state is `ServerIsWorking` or `ShutdownOrPoison`
    /// - on `NeedNewAction` it advances the state to the next action and
    ///   returns `NotReady` panicing if there is no new action
    /// - writes a random amount of passed in bytes (at last 1) to the
    ///   input buffer then returns `Ready` with the written byte count
    fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
        try_ready!(self.maybe_inject_not_ready());
        let state = mem::replace(&mut self.state, State::ShutdownOrPoison);
        match state {
            State::ShutdownOrPoison => panic!("tried reading from shutdown/poisoned stream"),
            State::ServerIsWorking { .. } => {
                panic!("tried to write to socket while it should only read from it")
            }
            State::NeedNewAction { waker, buffer } => {
                self.state = self.prepare_next(waker, buffer);
                self.schedule_delayed_wake();
                Ok(Async::NotReady)
            }
            State::ClientIsWorking {
                expected,
                waker,
                mut input,
            } => {
                let amount = random_amount(buf.len());
                if input.remaining_mut() < amount {
                    input.reserve(amount)
                }
                input.put(&buf[..amount]);

                self.state = State::ClientIsWorking {
                    expected,
                    waker,
                    input,
                };
                Ok(Async::Ready(amount))
            }
        }
    }

    /// `poll_flush` impl
    ///
    /// # Implementation Details
    ///
    /// - Can always return with `NotReady` before doing anything.
    /// - panics if the state is `ServerIsWorking` or `ShutdownOrPoison`
    /// - on `NeedNewAction` it advances the state to the next action and
    ///   returns `NotReady`, _or_ if there is no further action returns
    ///   `Ready`
    /// - always returns `Ready` in the `ClientIsWorking` state if
    ///   it doesn't panic through a (test) assert
    /// - in `ClientIsWorking` it is asserted that the read buffer and
    ///   expected buffer start the same way (up the the min of the len
    ///   of both). If it is found that all bytes where parsed as expected
    ///   the state is advanced to `NeedNewAction`. If more bytes where
    ///   read then they stay in the buffer which will cause a panic
    ///   when advancing to the next action  if the next action is
    ///   not another `Client` action.
    ///
    ///
    fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
        try_ready!(self.maybe_inject_not_ready());
        let state = mem::replace(&mut self.state, State::ShutdownOrPoison);
        match state {
            State::ShutdownOrPoison => panic!("tried reading from shutdown/poisoned stream"),
            State::ServerIsWorking { .. } => {
                panic!("tried to write to socket while it should only read from it")
            }
            State::NeedNewAction { waker, buffer } => {
                //poll flush on NeedNewAction + empty conversation should _not_ panic
                if self.conversation.is_empty() {
                    assert!(buffer.is_empty());
                    Ok(Async::Ready(()))
                } else {
                    self.state = self.prepare_next(waker, buffer);
                    self.schedule_delayed_wake();
                    Ok(Async::NotReady)
                }
            }
            State::ClientIsWorking {
                expected,
                waker,
                mut input,
            } => {
                // first: if !expected.starts_with(input) => assert panic
                expected.assert_same_start(&input);
                // then: if input >= expected { input.advance(expected.len()); state advance too
                let expected_len = expected.len();
                if input.len() >= expected_len {
                    input.advance(expected_len);
                    self.state = State::NeedNewAction {
                        waker,
                        buffer: input,
                    };
                    Ok(Async::Ready(()))
                } else {
                    self.state = State::ClientIsWorking {
                        expected,
                        waker,
                        input,
                    };
                    Ok(Async::Ready(()))
                }
            }
        }
    }

    /// `shutdown` impl
    ///
    /// # Implementation Details
    ///
    /// - can return `NotReady` in any state
    /// - uses `poll_flush` until everything is flushed
    /// - If state is not `ShutdownOrPoison` or `NeedNewAction` it will panic.
    /// - Be aware that due to the call to flush a state
    ///   of `ClientIsWorking` is likely to change or panic
    ///   in `poll_flush` instead of in `shutdown`
    ///
    ///
    fn shutdown(&mut self) -> Poll<(), std_io::Error> {
        // shutdown implies flush, so we flush
        try_ready!(self.poll_flush());
        match &self.state {
            &State::ShutdownOrPoison => (),
            &State::NeedNewAction { .. } => (),
            _ => panic!("unexpected state when shutting down"),
        }
        self.state = State::ShutdownOrPoison;
        Ok(Async::Ready(()))
    }
}

/// returns a random number in `[1; max_inclusive]`, where` max_inclusive` is the most likely value
///
/// Note: `random_amount(0)` always returns 0, any other value returns a number
/// between 1 and the value (inclusive). With some slight bias to higher values.
fn random_amount(max_inclusive: usize) -> usize {
    // make it more "likely" to write more stuff
    // (this is statistically horrible hack, but works fine here)
    // (the +1 is to ofset the exclusiveness of gen_range and the 16 to have a slight bias for higher values)
    min(
        max_inclusive,
        thread_rng().gen_range(1, max_inclusive + 1 + 16),
    )
}

/// copies `from[..n]` to `to[..n]`
fn write_n_to_slice(from: &[u8], to: &mut [u8], n: usize) {
    to[..n].copy_from_slice(&from[..n]);
}

fn delayed_waker() -> mpsc::UnboundedSender<Task> {
    let (tx, rx) = mpsc::unbounded();
    thread::spawn(move || {
        let pipe = rx.for_each(|task: Task| {
            //sleep some smallish random time
            //sleep between ~ 0ms - 4ms
            let nanos = random::<u32>() / 1000;
            thread::sleep(Duration::new(0, nanos));

            task.notify();
            future::ok::<(), ()>(())
        });

        pipe.wait().unwrap()
    });

    tx
}

#[cfg(test)]
mod test {
    #![allow(non_snake_case)]
    use std::thread;
    use std::time::Duration;

    use futures::sync::oneshot;
    use futures::{future, Future};

    fn time_out(secs: u64) -> Box<dyn Future<Item = (), Error = ()>> {
        let (tx, rx) = oneshot::channel();
        thread::spawn(move || {
            thread::sleep(Duration::new(secs, 0));
            let _ = tx.send(());
        });

        Box::new(rx.then(|_| future::ok(())))
    }

    mod random_amount {
        use super::super::random_amount;

        #[test]
        fn on_1() {
            for _ in 0..100 {
                assert_eq!(random_amount(1), 1);
            }
        }

        #[test]
        fn on_0() {
            for _ in 0..100 {
                assert_eq!(random_amount(0), 0);
            }
        }

        #[test]
        fn on_X() {
            let x = 10;
            for _ in 0..100 {
                let got = random_amount(x);
                assert!(got >= 1);
                assert!(got <= x);
            }
        }
    }

    mod delayed_waker {
        use futures::Future;

        use super::super::*;
        use super::time_out;

        fn wake_task_later(waker: &Waker) {
            waker.unbounded_send(task::current()).unwrap()
        }

        #[test]
        fn calls_notify() {
            let waker = delayed_waker();

            let mut is_first = true;
            let fut = future::poll_fn(|| -> Poll<(), ()> {
                if is_first {
                    is_first = false;
                    wake_task_later(&waker);
                    Ok(Async::NotReady)
                } else {
                    Ok(Async::Ready(()))
                }
            });

            match fut.select2(time_out(1)).wait() {
                Ok(future::Either::A(_)) => (),
                Ok(future::Either::B(_)) => panic!("time out occured"),
                Err(_e) => unreachable!(),
            }
        }
    }

    mod ActionData {
        use super::super::ActionData;
        use std::panic;

        fn should_fail<FN>(func: FN)
        where
            FN: panic::UnwindSafe + FnOnce(),
        {
            match panic::catch_unwind(func) {
                Ok(_) => panic!("closure should have paniced"),
                Err(_) => (),
            }
        }

        #[should_panic]
        #[test]
        fn should_fail_should_panic_on_ok() {
            should_fail(|| ())
        }

        mod len {
            use super::*;

            #[test]
            fn len_blob() {
                let blob = ActionData::Blob("la blob".to_owned().into());
                assert_eq!(blob.len(), 7)
            }

            #[test]
            fn len_lines() {
                let lines = ActionData::Lines(vec!["123", "678"]);

                assert_eq!(lines.len(), 10)
            }
        }

        mod assert_start_same {
            use super::*;

            #[test]
            fn blob_smaller_other() {
                let blob = ActionData::Blob("blob".to_owned().into());
                blob.assert_same_start(b"blo" as &[u8]);
                should_fail(|| blob.assert_same_start(b"blO" as &[u8]));
            }

            #[test]
            fn blob_larger_other() {
                let blob = ActionData::Blob("blob".to_owned().into());
                blob.assert_same_start(b"blob and top" as &[u8]);
                should_fail(|| blob.assert_same_start(b"bloB and top" as &[u8]));
            }

            #[test]
            fn lines_smaller_other() {
                let lines = ActionData::Lines(vec!["123", "678"]);
                lines.assert_same_start(b"123\r\n6" as &[u8]);
                should_fail(|| lines.assert_same_start(b"123\r\n7" as &[u8]));
                should_fail(|| lines.assert_same_start(b"123\n\n6" as &[u8]));
            }

            #[test]
            fn lines_same_len_other() {
                let lines = ActionData::Lines(vec!["123", "678"]);
                lines.assert_same_start(b"123\r\n678\r\n" as &[u8]);
                should_fail(|| lines.assert_same_start(b"123\r\n678\r\r" as &[u8]));
            }

            #[test]
            fn lines_larger_other() {
                let lines = ActionData::Lines(vec!["123", "678"]);
                lines.assert_same_start(b"123\r\n678\r\nho" as &[u8]);
                should_fail(|| lines.assert_same_start(b"123\r\n678\rho" as &[u8]));
            }
        }
    }

    mod MockSocket {

        use bytes::Bytes;

        use super::super::*;
        use super::time_out;

        mod shutdown {
            use super::*;

            #[should_panic]
            #[test]
            fn on_still_working_socket() {
                let waker = delayed_waker();
                let mut socket = MockSocket::new(vec![]);
                socket.state = State::ServerIsWorking {
                    waker,
                    to_be_read: BytesMut::new(),
                };

                let _res = future::poll_fn(|| socket.shutdown())
                    .select(
                        time_out(1).then(|_| -> Result<(), std_io::Error> { panic!("timeout") }),
                    )
                    .wait();
            }

            #[test]
            fn on_done_conversation() {
                let mut socket = MockSocket::new(vec![]);

                let res = future::poll_fn(|| socket.shutdown())
                    .select(
                        time_out(1).then(|_| -> Result<(), std_io::Error> { panic!("timeout") }),
                    )
                    .wait();

                match res {
                    Ok(_) => (),
                    Err((err, _)) => panic!("unexpected error {:?}", err),
                }
            }
        }

        #[test]
        fn with_simple_session() {
            use self::ActionData::*;
            use self::Actor::*;

            let mut socket = Some(MockSocket::new(vec![
                (Server, Blob("hy there\r\n".as_bytes().to_owned())),
                (Client, Blob("quit\r\n".as_bytes().to_owned())),
            ]));

            let fut = future::poll_fn({
                let mut buf = Box::new([0u8, 0, 0, 0]) as Box<[u8]>;
                let mut expect = b"hy there\r\n" as &[u8];
                move || -> Poll<Option<MockSocket>, std_io::Error> {
                    loop {
                        let n = try_ready!(socket.as_mut().unwrap().poll_read(&mut buf));

                        assert!(n > 0);
                        let read = &buf[..n];
                        let (use_expected, new_expected) = expect.split_at(n);
                        expect = new_expected;
                        assert_eq!(use_expected, read);

                        if expect.is_empty() {
                            return Ok(Async::Ready(socket.take()));
                        }
                    }
                }
            })
            .and_then(|mut socket| {
                future::poll_fn({
                    let mut bytes = Bytes::from("quit\r\n");
                    move || loop {
                        let n = try_ready!(socket.as_mut().unwrap().poll_write(&bytes));

                        assert!(n > 0);
                        bytes.advance(n);
                        if bytes.is_empty() {
                            return Ok(Async::Ready(socket.take()));
                        }
                    }
                })
            })
            .and_then(|mut socket| {
                future::poll_fn(move || {
                    try_ready!(socket.as_mut().unwrap().shutdown());
                    Ok(Async::Ready(()))
                })
            })
            .select2(time_out(1));

            match fut.wait() {
                Ok(future::Either::A(_)) => (),
                Ok(future::Either::B(((), _))) => panic!("timeout"),
                Err(_e) => unreachable!(),
            }
        }
    }
}