1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
//! Provides a wrapper for streams, to test blocking and errors.

use futures::{Sink, StartSend, Poll, task, Async, Stream};
use quickcheck::{Arbitrary, Gen};

/// What to do the next time `poll` is called.
#[derive(Clone, Debug, PartialEq)]
pub enum PollOp<E> {
    /// Simply delegate to the underlying Stream.
    Delegate,

    /// Return `AsyncSink::NotReady` instead of calling into the underlying
    /// operation. The task is immediately notified.
    NotReady,

    /// Return an error instead of calling into the underlying operation.
    Err(E),
}

impl<E> Arbitrary for PollOp<E>
    where E: Arbitrary
{
    /// Generates 75% Delegate, 25% NotReady.
    fn arbitrary<G: Gen>(g: &mut G) -> Self {
        if g.next_f32() < 0.25 {
            PollOp::NotReady
        } else {
            PollOp::Delegate
        }
    }
}

/// A Stream wrapper that modifies operations of the inner Sink according to the
/// provided iterator.
pub struct TestStream<S: Stream> {
    inner: S,
    poll_ops: Box<Iterator<Item = PollOp<S::Error>> + Send>,
}

impl<S: Stream> TestStream<S> {
    /// Creates a new `TestStream` wrapper over the Samtre with the specified `PollOps`s.
    pub fn new<I>(inner: S, poll_iter: I) -> Self
        where I: IntoIterator<Item = PollOp<S::Error>> + 'static,
              I::IntoIter: Send
    {
        TestStream {
            inner: inner,
            poll_ops: Box::new(poll_iter.into_iter().fuse()),
        }
    }

    /// Sets the `PollOp`s for this Stream.
    pub fn set_poll_ops<I>(&mut self, poll_iter: I) -> &mut Self
        where I: IntoIterator<Item = PollOp<S::Error>> + 'static,
              I::IntoIter: Send
    {
        self.poll_ops = Box::new(poll_iter.into_iter().fuse());
        self
    }

    /// Acquires a reference to the underlying Sink.
    pub fn get_ref(&self) -> &S {
        &self.inner
    }

    /// Acquires a mutable reference to the underlying Sink.
    pub fn get_mut(&mut self) -> &mut S {
        &mut self.inner
    }

    /// Consumes this wrapper, returning the underlying Sink.
    pub fn into_inner(self) -> S {
        self.inner
    }
}

impl<S: Stream> Stream for TestStream<S> {
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.poll_ops.next() {
            Some(PollOp::NotReady) => {
                task::current().notify();
                Ok(Async::NotReady)
            }
            Some(PollOp::Err(err)) => Err(err),
            Some(PollOp::Delegate) |
            None => self.inner.poll(),
        }
    }
}

impl<S: Sink + Stream> Sink for TestStream<S> {
    type SinkItem = S::SinkItem;
    type SinkError = S::SinkError;

    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        self.inner.start_send(item)
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        self.inner.poll_complete()
    }

    fn close(&mut self) -> Poll<(), Self::SinkError> {
        self.inner.close()
    }
}