futures 0.1.16

An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
Documentation
use core::mem;
use core::marker::PhantomData;

use {IntoFuture, Future, Poll, Async, StartSend, AsyncSink};
use sink::Sink;
use stream::Stream;

/// Sink for the `Sink::with` combinator, chaining a computation to run *prior*
/// to pushing a value into the underlying sink.
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct With<S, U, F, Fut>
    where S: Sink,
          F: FnMut(U) -> Fut,
          Fut: IntoFuture,
{
    sink: S,
    f: F,
    state: State<Fut::Future, S::SinkItem>,
    _phantom: PhantomData<fn(U)>,
}

#[derive(Debug)]
enum State<Fut, T> {
    Empty,
    Process(Fut),
    Buffered(T),
}

impl<Fut, T> State<Fut, T> {
    fn is_empty(&self) -> bool {
        if let State::Empty = *self {
            true
        } else {
            false
        }
    }
}

pub fn new<S, U, F, Fut>(sink: S, f: F) -> With<S, U, F, Fut>
    where S: Sink,
          F: FnMut(U) -> Fut,
          Fut: IntoFuture<Item = S::SinkItem>,
          Fut::Error: From<S::SinkError>,
{
    With {
        state: State::Empty,
        sink: sink,
        f: f,
        _phantom: PhantomData,
    }
}

// Forwarding impl of Stream from the underlying sink
impl<S, U, F, Fut> Stream for With<S, U, F, Fut>
    where S: Stream + Sink,
          F: FnMut(U) -> Fut,
          Fut: IntoFuture
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
        self.sink.poll()
    }
}

impl<S, U, F, Fut> With<S, U, F, Fut>
    where S: Sink,
          F: FnMut(U) -> Fut,
          Fut: IntoFuture<Item = S::SinkItem>,
          Fut::Error: From<S::SinkError>,
{
    /// Get a shared reference to the inner sink.
    pub fn get_ref(&self) -> &S {
        &self.sink
    }

    /// Get a mutable reference to the inner sink.
    pub fn get_mut(&mut self) -> &mut S {
        &mut self.sink
    }

    /// Consumes this combinator, returning the underlying sink.
    ///
    /// Note that this may discard intermediate state of this combinator, so
    /// care should be taken to avoid losing resources when this is called.
    pub fn into_inner(self) -> S {
        self.sink
    }

    fn poll(&mut self) -> Poll<(), Fut::Error> {
        loop {
            match mem::replace(&mut self.state, State::Empty) {
                State::Empty => break,
                State::Process(mut fut) => {
                    match fut.poll()? {
                        Async::Ready(item) => {
                            self.state = State::Buffered(item);
                        }
                        Async::NotReady => {
                            self.state = State::Process(fut);
                            break
                        }
                    }
                }
                State::Buffered(item) => {
                    if let AsyncSink::NotReady(item) = self.sink.start_send(item)? {
                        self.state = State::Buffered(item);
                        break
                    }
                }
            }
        }

        if self.state.is_empty() {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

impl<S, U, F, Fut> Sink for With<S, U, F, Fut>
    where S: Sink,
          F: FnMut(U) -> Fut,
          Fut: IntoFuture<Item = S::SinkItem>,
          Fut::Error: From<S::SinkError>,
{
    type SinkItem = U;
    type SinkError = Fut::Error;

    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Fut::Error> {
        if self.poll()?.is_not_ready() {
            return Ok(AsyncSink::NotReady(item))
        }
        self.state = State::Process((self.f)(item).into_future());
        Ok(AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> Poll<(), Fut::Error> {
        // poll ourselves first, to push data downward
        let me_ready = self.poll()?;
        // always propagate `poll_complete` downward to attempt to make progress
        try_ready!(self.sink.poll_complete());
        Ok(me_ready)
    }

    fn close(&mut self) -> Poll<(), Fut::Error> {
        try_ready!(self.poll());
        Ok(self.sink.close()?)
    }
}