futures 0.1.26

An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
Documentation
//! Asynchronous streams
//!
//! This module contains the `Stream` trait and a number of adaptors for this
//! trait. This trait is very similar to the `Iterator` trait in the standard
//! library except that it expresses the concept of blocking as well. A stream
//! here is a sequential sequence of values which may take some amount of time
//! in between to produce.
//!
//! A stream may request that it is blocked between values while the next value
//! is calculated, and provides a way to get notified once the next value is
//! ready as well.
//!
//! You can find more information/tutorials about streams [online at
//! https://tokio.rs][online]
//!
//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/

use {IntoFuture, Poll};

mod iter;
#[allow(deprecated)]
pub use self::iter::{iter, Iter};
#[cfg(feature = "with-deprecated")]
#[allow(deprecated)]
pub use self::Iter as IterStream;
mod iter_ok;
pub use self::iter_ok::{iter_ok, IterOk};
mod iter_result;
pub use self::iter_result::{iter_result, IterResult};

mod repeat;
pub use self::repeat::{repeat, Repeat};

mod and_then;
mod chain;
mod concat;
mod empty;
mod filter;
mod filter_map;
mod flatten;
mod fold;
mod for_each;
mod from_err;
mod fuse;
mod future;
mod inspect;
mod inspect_err;
mod map;
mod map_err;
mod merge;
mod once;
mod or_else;
mod peek;
mod poll_fn;
mod select;
mod skip;
mod skip_while;
mod take;
mod take_while;
mod then;
mod unfold;
mod zip;
mod forward;
pub use self::and_then::AndThen;
pub use self::chain::Chain;
#[allow(deprecated)]
pub use self::concat::Concat;
pub use self::concat::Concat2;
pub use self::empty::{Empty, empty};
pub use self::filter::Filter;
pub use self::filter_map::FilterMap;
pub use self::flatten::Flatten;
pub use self::fold::Fold;
pub use self::for_each::ForEach;
pub use self::from_err::FromErr;
pub use self::fuse::Fuse;
pub use self::future::StreamFuture;
pub use self::inspect::Inspect;
pub use self::inspect_err::InspectErr;
pub use self::map::Map;
pub use self::map_err::MapErr;
#[allow(deprecated)]
pub use self::merge::{Merge, MergedItem};
pub use self::once::{Once, once};
pub use self::or_else::OrElse;
pub use self::peek::Peekable;
pub use self::poll_fn::{poll_fn, PollFn};
pub use self::select::Select;
pub use self::skip::Skip;
pub use self::skip_while::SkipWhile;
pub use self::take::Take;
pub use self::take_while::TakeWhile;
pub use self::then::Then;
pub use self::unfold::{Unfold, unfold};
pub use self::zip::Zip;
pub use self::forward::Forward;
use sink::{Sink};

if_std! {
    use std;

    mod buffered;
    mod buffer_unordered;
    mod catch_unwind;
    mod chunks;
    mod collect;
    mod wait;
    mod channel;
    mod split;
    pub mod futures_unordered;
    mod futures_ordered;
    pub use self::buffered::Buffered;
    pub use self::buffer_unordered::BufferUnordered;
    pub use self::catch_unwind::CatchUnwind;
    pub use self::chunks::Chunks;
    pub use self::collect::Collect;
    pub use self::wait::Wait;
    pub use self::split::{SplitStream, SplitSink, ReuniteError};
    pub use self::futures_unordered::FuturesUnordered;
    pub use self::futures_ordered::{futures_ordered, FuturesOrdered};

    #[doc(hidden)]
    #[cfg(feature = "with-deprecated")]
    #[allow(deprecated)]
    pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError};

    /// A type alias for `Box<Stream + Send>`
    #[doc(hidden)]
    #[deprecated(note = "removed without replacement, recommended to use a \
                         local extension trait or function if needed, more \
                         details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
    pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>;

    impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
        type Item = S::Item;
        type Error = S::Error;

        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
            (**self).poll()
        }
    }
}

/// A stream of values, not all of which may have been produced yet.
///
/// `Stream` is a trait to represent any source of sequential events or items
/// which acts like an iterator but long periods of time may pass between
/// items. Like `Future` the methods of `Stream` never block and it is thus
/// suitable for programming in an asynchronous fashion. This trait is very
/// similar to the `Iterator` trait in the standard library where `Some` is
/// used to signal elements of the stream and `None` is used to indicate that
/// the stream is finished.
///
/// Like futures a stream has basic combinators to transform the stream, perform
/// more work on each item, etc.
///
/// You can find more information/tutorials about streams [online at
/// https://tokio.rs][online]
///
/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
///
/// # Streams as Futures
///
/// Any instance of `Stream` can also be viewed as a `Future` where the resolved
/// value is the next item in the stream along with the rest of the stream. The
/// `into_future` adaptor can be used here to convert any stream into a future
/// for use with other future methods like `join` and `select`.
///
/// # Errors
///
/// Streams, like futures, can also model errors in their computation. All
/// streams have an associated `Error` type like with futures. Currently as of
/// the 0.1 release of this library an error on a stream **does not terminate
/// the stream**. That is, after one error is received, another error may be
/// received from the same stream (it's valid to keep polling).
///
/// This property of streams, however, is [being considered] for change in 0.2
/// where an error on a stream is similar to `None`, it terminates the stream
/// entirely. If one of these use cases suits you perfectly and not the other,
/// please feel welcome to comment on [the issue][being considered]!
///
/// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206
pub trait Stream {
    /// The type of item this stream will yield on success.
    type Item;

    /// The type of error this stream may generate.
    type Error;

    /// Attempt to pull out the next value of this stream, returning `None` if
    /// the stream is finished.
    ///
    /// This method, like `Future::poll`, is the sole method of pulling out a
    /// value from a stream. This method must also be run within the context of
    /// a task typically and implementors of this trait must ensure that
    /// implementations of this method do not block, as it may cause consumers
    /// to behave badly.
    ///
    /// # Return value
    ///
    /// If `NotReady` is returned then this stream's next value is not ready
    /// yet and implementations will ensure that the current task will be
    /// notified when the next value may be ready. If `Some` is returned then
    /// the returned value represents the next value on the stream. `Err`
    /// indicates an error happened, while `Ok` indicates whether there was a
    /// new item on the stream or whether the stream has terminated.
    ///
    /// # Panics
    ///
    /// Once a stream is finished, that is `Ready(None)` has been returned,
    /// further calls to `poll` may result in a panic or other "bad behavior".
    /// If this is difficult to guard against then the `fuse` adapter can be
    /// used to ensure that `poll` always has well-defined semantics.
    // TODO: more here
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

    // TODO: should there also be a method like `poll` but doesn't return an
    //       item? basically just says "please make more progress internally"
    //       seems crucial for buffering to actually make any sense.

    /// Creates an iterator which blocks the current thread until each item of
    /// this stream is resolved.
    ///
    /// This method will consume ownership of this stream, returning an
    /// implementation of a standard iterator. This iterator will *block the
    /// current thread* on each call to `next` if the item in the stream isn't
    /// ready yet.
    ///
    /// > **Note:** This method is not appropriate to call on event loops or
    /// >           similar I/O situations because it will prevent the event
    /// >           loop from making progress (this blocks the thread). This
    /// >           method should only be called when it's guaranteed that the
    /// >           blocking work associated with this stream will be completed
    /// >           by another thread.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    ///
    /// # Panics
    ///
    /// The returned iterator does not attempt to catch panics. If the `poll`
    /// function panics, panics will be propagated to the caller of `next`.
    #[cfg(feature = "use_std")]
    fn wait(self) -> Wait<Self>
        where Self: Sized
    {
        wait::new(self)
    }

    /// Convenience function for turning this stream into a trait object.
    ///
    /// This simply avoids the need to write `Box::new` and can often help with
    /// type inference as well by always returning a trait object. Note that
    /// this method requires the `Send` bound and returns a `BoxStream`, which
    /// also encodes this. If you'd like to create a `Box<Stream>` without the
    /// `Send` bound, then the `Box::new` function can be used instead.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::stream::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel(1);
    /// let a: BoxStream<i32, ()> = rx.boxed();
    /// ```
    #[cfg(feature = "use_std")]
    #[doc(hidden)]
    #[deprecated(note = "removed without replacement, recommended to use a \
                         local extension trait or function if needed, more \
                         details in https://github.com/rust-lang-nursery/futures-rs/issues/228")]
    #[allow(deprecated)]
    fn boxed(self) -> BoxStream<Self::Item, Self::Error>
        where Self: Sized + Send + 'static,
    {
        ::std::boxed::Box::new(self)
    }

    /// Converts this stream into a `Future`.
    ///
    /// A stream can be viewed as a future which will resolve to a pair containing
    /// the next element of the stream plus the remaining stream. If the stream
    /// terminates, then the next element is `None` and the remaining stream is
    /// still passed back, to allow reclamation of its resources.
    ///
    /// The returned future can be used to compose streams and futures together by
    /// placing everything into the "world of futures".
    fn into_future(self) -> StreamFuture<Self>
        where Self: Sized
    {
        future::new(self)
    }

    /// Converts a stream of type `T` to a stream of type `U`.
    ///
    /// The provided closure is executed over all elements of this stream as
    /// they are made available, and the callback will be executed inline with
    /// calls to `poll`.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it, similar to the existing `map` methods in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    /// let rx = rx.map(|x| x + 3);
    /// ```
    fn map<U, F>(self, f: F) -> Map<Self, F>
        where F: FnMut(Self::Item) -> U,
              Self: Sized
    {
        map::new(self, f)
    }

    /// Converts a stream of error type `T` to a stream of error type `U`.
    ///
    /// The provided closure is executed over all errors of this stream as
    /// they are made available, and the callback will be executed inline with
    /// calls to `poll`.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it, similar to the existing `map_err` methods in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    /// let rx = rx.map_err(|()| 3);
    /// ```
    fn map_err<U, F>(self, f: F) -> MapErr<Self, F>
        where F: FnMut(Self::Error) -> U,
              Self: Sized
    {
        map_err::new(self, f)
    }

    /// Filters the values produced by this stream according to the provided
    /// predicate.
    ///
    /// As values of this stream are made available, the provided predicate will
    /// be run against them. If the predicate returns `true` then the stream
    /// will yield the value, but if the predicate returns `false` then the
    /// value will be discarded and the next value will be produced.
    ///
    /// All errors are passed through without filtering in this combinator.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it, similar to the existing `filter` methods in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    /// let evens = rx.filter(|x| x % 2 == 0);
    /// ```
    fn filter<F>(self, f: F) -> Filter<Self, F>
        where F: FnMut(&Self::Item) -> bool,
              Self: Sized
    {
        filter::new(self, f)
    }

    /// Filters the values produced by this stream while simultaneously mapping
    /// them to a different type.
    ///
    /// As values of this stream are made available, the provided function will
    /// be run on them. If the predicate returns `Some(e)` then the stream will
    /// yield the value `e`, but if the predicate returns `None` then the next
    /// value will be produced.
    ///
    /// All errors are passed through without filtering in this combinator.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it, similar to the existing `filter_map` methods in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    /// let evens_plus_one = rx.filter_map(|x| {
    ///     if x % 0 == 2 {
    ///         Some(x + 1)
    ///     } else {
    ///         None
    ///     }
    /// });
    /// ```
    fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F>
        where F: FnMut(Self::Item) -> Option<B>,
              Self: Sized
    {
        filter_map::new(self, f)
    }

    /// Chain on a computation for when a value is ready, passing the resulting
    /// item to the provided closure `f`.
    ///
    /// This function can be used to ensure a computation runs regardless of
    /// the next value on the stream. The closure provided will be yielded a
    /// `Result` once a value is ready, and the returned future will then be run
    /// to completion to produce the next value on this stream.
    ///
    /// The returned value of the closure must implement the `IntoFuture` trait
    /// and can represent some more work to be done before the composed stream
    /// is finished. Note that the `Result` type implements the `IntoFuture`
    /// trait so it is possible to simply alter the `Result` yielded to the
    /// closure and return it.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    ///
    /// let rx = rx.then(|result| {
    ///     match result {
    ///         Ok(e) => Ok(e + 3),
    ///         Err(()) => Err(4),
    ///     }
    /// });
    /// ```
    fn then<F, U>(self, f: F) -> Then<Self, F, U>
        where F: FnMut(Result<Self::Item, Self::Error>) -> U,
              U: IntoFuture,
              Self: Sized
    {
        then::new(self, f)
    }

    /// Chain on a computation for when a value is ready, passing the successful
    /// results to the provided closure `f`.
    ///
    /// This function can be used to run a unit of work when the next successful
    /// value on a stream is ready. The closure provided will be yielded a value
    /// when ready, and the returned future will then be run to completion to
    /// produce the next value on this stream.
    ///
    /// Any errors produced by this stream will not be passed to the closure,
    /// and will be passed through.
    ///
    /// The returned value of the closure must implement the `IntoFuture` trait
    /// and can represent some more work to be done before the composed stream
    /// is finished. Note that the `Result` type implements the `IntoFuture`
    /// trait so it is possible to simply alter the `Result` yielded to the
    /// closure and return it.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it.
    ///
    /// To process the entire stream and return a single future representing
    /// success or error, use `for_each` instead.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (_tx, rx) = mpsc::channel::<i32>(1);
    ///
    /// let rx = rx.and_then(|result| {
    ///     if result % 2 == 0 {
    ///         Ok(result)
    ///     } else {
    ///         Err(())
    ///     }
    /// });
    /// ```
    fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U>
        where F: FnMut(Self::Item) -> U,
              U: IntoFuture<Error = Self::Error>,
              Self: Sized
    {
        and_then::new(self, f)
    }

    /// Chain on a computation for when an error happens, passing the
    /// erroneous result to the provided closure `f`.
    ///
    /// This function can be used to run a unit of work and attempt to recover from
    /// an error if one happens. The closure provided will be yielded an error
    /// when one appears, and the returned future will then be run to completion
    /// to produce the next value on this stream.
    ///
    /// Any successful values produced by this stream will not be passed to the
    /// closure, and will be passed through.
    ///
    /// The returned value of the closure must implement the `IntoFuture` trait
    /// and can represent some more work to be done before the composed stream
    /// is finished. Note that the `Result` type implements the `IntoFuture`
    /// trait so it is possible to simply alter the `Result` yielded to the
    /// closure and return it.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it.
    fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U>
        where F: FnMut(Self::Error) -> U,
              U: IntoFuture<Item = Self::Item>,
              Self: Sized
    {
        or_else::new(self, f)
    }

    /// Collect all of the values of this stream into a vector, returning a
    /// future representing the result of that computation.
    ///
    /// This combinator will collect all successful results of this stream and
    /// collect them into a `Vec<Self::Item>`. If an error happens then all
    /// collected elements will be dropped and the error will be returned.
    ///
    /// The returned future will be resolved whenever an error happens or when
    /// the stream returns `Ok(None)`.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    ///
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (mut tx, rx) = mpsc::channel(1);
    ///
    /// thread::spawn(|| {
    ///     for i in (0..5).rev() {
    ///         tx = tx.send(i + 1).wait().unwrap();
    ///     }
    /// });
    ///
    /// let mut result = rx.collect();
    /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1]));
    /// ```
    #[cfg(feature = "use_std")]
    fn collect(self) -> Collect<Self>
        where Self: Sized
    {
        collect::new(self)
    }

    /// Concatenate all results of a stream into a single extendable
    /// destination, returning a future representing the end result.
    ///
    /// This combinator will extend the first item with the contents
    /// of all the successful results of the stream. If the stream is
    /// empty, the default value will be returned. If an error occurs,
    /// all the results will be dropped and the error will be returned.
    ///
    /// The name `concat2` is an intermediate measure until the release of
    /// futures 0.2, at which point it will be renamed back to `concat`.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    ///
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (mut tx, rx) = mpsc::channel(1);
    ///
    /// thread::spawn(move || {
    ///     for i in (0..3).rev() {
    ///         let n = i * 3;
    ///         tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
    ///     }
    /// });
    /// let result = rx.concat2();
    /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
    /// ```
    fn concat2(self) -> Concat2<Self>
        where Self: Sized,
              Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
    {
        concat::new2(self)
    }

    /// Concatenate all results of a stream into a single extendable
    /// destination, returning a future representing the end result.
    ///
    /// This combinator will extend the first item with the contents
    /// of all the successful results of the stream. If an error occurs,
    /// all the results will be dropped and the error will be returned.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    ///
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (mut tx, rx) = mpsc::channel(1);
    ///
    /// thread::spawn(move || {
    ///     for i in (0..3).rev() {
    ///         let n = i * 3;
    ///         tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap();
    ///     }
    /// });
    /// let result = rx.concat();
    /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
    /// ```
    ///
    /// # Panics
    ///
    /// It's important to note that this function will panic if the stream
    /// is empty, which is the reason for its deprecation.
    #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")]
    #[allow(deprecated)]
    fn concat(self) -> Concat<Self>
        where Self: Sized,
              Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator,
    {
        concat::new(self)
    }

    /// Execute an accumulating computation over a stream, collecting all the
    /// values into one final result.
    ///
    /// This combinator will collect all successful results of this stream
    /// according to the closure provided. The initial state is also provided to
    /// this method and then is returned again by each execution of the closure.
    /// Once the entire stream has been exhausted the returned future will
    /// resolve to this value.
    ///
    /// If an error happens then collected state will be dropped and the error
    /// will be returned.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::stream;
    /// use futures::future;
    ///
    /// let number_stream = stream::iter_ok::<_, ()>(0..6);
    /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x));
    /// assert_eq!(sum.wait(), Ok(15));
    /// ```
    fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T>
        where F: FnMut(T, Self::Item) -> Fut,
              Fut: IntoFuture<Item = T>,
              Self::Error: From<Fut::Error>,
              Self: Sized
    {
        fold::new(self, f, init)
    }

    /// Flattens a stream of streams into just one continuous stream.
    ///
    /// If this stream's elements are themselves streams then this combinator
    /// will flatten out the entire stream to one long chain of elements. Any
    /// errors are passed through without looking at them, but otherwise each
    /// individual stream will get exhausted before moving on to the next.
    ///
    /// ```
    /// use std::thread;
    ///
    /// use futures::prelude::*;
    /// use futures::sync::mpsc;
    ///
    /// let (tx1, rx1) = mpsc::channel::<i32>(1);
    /// let (tx2, rx2) = mpsc::channel::<i32>(1);
    /// let (tx3, rx3) = mpsc::channel(1);
    ///
    /// thread::spawn(|| {
    ///     tx1.send(1).wait().unwrap()
    ///        .send(2).wait().unwrap();
    /// });
    /// thread::spawn(|| {
    ///     tx2.send(3).wait().unwrap()
    ///        .send(4).wait().unwrap();
    /// });
    /// thread::spawn(|| {
    ///     tx3.send(rx1).wait().unwrap()
    ///        .send(rx2).wait().unwrap();
    /// });
    ///
    /// let mut result = rx3.flatten().collect();
    /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4]));
    /// ```
    fn flatten(self) -> Flatten<Self>
        where Self::Item: Stream,
              <Self::Item as Stream>::Error: From<Self::Error>,
              Self: Sized
    {
        flatten::new(self)
    }

    /// Skip elements on this stream while the predicate provided resolves to
    /// `true`.
    ///
    /// This function, like `Iterator::skip_while`, will skip elements on the
    /// stream until the `predicate` resolves to `false`. Once one element
    /// returns false all future elements will be returned from the underlying
    /// stream.
    fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R>
        where P: FnMut(&Self::Item) -> R,
              R: IntoFuture<Item=bool, Error=Self::Error>,
              Self: Sized
    {
        skip_while::new(self, pred)
    }

    /// Take elements from this stream while the predicate provided resolves to
    /// `true`.
    ///
    /// This function, like `Iterator::take_while`, will take elements from the
    /// stream until the `predicate` resolves to `false`. Once one element
    /// returns false it will always return that the stream is done.
    fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
        where P: FnMut(&Self::Item) -> R,
              R: IntoFuture<Item=bool, Error=Self::Error>,
              Self: Sized
    {
        take_while::new(self, pred)
    }

    /// Runs this stream to completion, executing the provided closure for each
    /// element on the stream.
    ///
    /// The closure provided will be called for each item this stream resolves
    /// to successfully, producing a future. That future will then be executed
    /// to completion before moving on to the next item.
    ///
    /// The returned value is a `Future` where the `Item` type is `()` and
    /// errors are otherwise threaded through. Any error on the stream or in the
    /// closure will cause iteration to be halted immediately and the future
    /// will resolve to that error.
    ///
    /// To process each item in the stream and produce another stream instead
    /// of a single future, use `and_then` instead.
    fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U>
        where F: FnMut(Self::Item) -> U,
              U: IntoFuture<Item=(), Error = Self::Error>,
              Self: Sized
    {
        for_each::new(self, f)
    }

    /// Map this stream's error to any error implementing `From` for
    /// this stream's `Error`, returning a new stream.
    ///
    /// This function does for streams what `try!` does for `Result`,
    /// by letting the compiler infer the type of the resulting error.
    /// Just as `map_err` above, this is useful for example to ensure
    /// that streams have the same error type when used with
    /// combinators.
    ///
    /// Note that this function consumes the receiving stream and returns a
    /// wrapped version of it.
    fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E>
        where Self: Sized,
    {
        from_err::new(self)
    }

    /// Creates a new stream of at most `amt` items of the underlying stream.
    ///
    /// Once `amt` items have been yielded from this stream then it will always
    /// return that the stream is done.
    ///
    /// # Errors
    ///
    /// Any errors yielded from underlying stream, before the desired amount of
    /// items is reached, are passed through and do not affect the total number
    /// of items taken.
    fn take(self, amt: u64) -> Take<Self>
        where Self: Sized
    {
        take::new(self, amt)
    }

    /// Creates a new stream which skips `amt` items of the underlying stream.
    ///
    /// Once `amt` items have been skipped from this stream then it will always
    /// return the remaining items on this stream.
    ///
    /// # Errors
    ///
    /// All errors yielded from underlying stream are passed through and do not
    /// affect the total number of items skipped.
    fn skip(self, amt: u64) -> Skip<Self>
        where Self: Sized
    {
        skip::new(self, amt)
    }

    /// Fuse a stream such that `poll` will never again be called once it has
    /// finished.
    ///
    /// Currently once a stream has returned `None` from `poll` any further
    /// calls could exhibit bad behavior such as block forever, panic, never
    /// return, etc. If it is known that `poll` may be called after stream has
    /// already finished, then this method can be used to ensure that it has
    /// defined semantics.
    ///
    /// Once a stream has been `fuse`d and it finishes, then it will forever
    /// return `None` from `poll`. This, unlike for the traits `poll` method,
    /// is guaranteed.
    ///
    /// Also note that as soon as this stream returns `None` it will be dropped
    /// to reclaim resources associated with it.
    fn fuse(self) -> Fuse<Self>
        where Self: Sized
    {
        fuse::new(self)
    }

    /// Borrows a stream, rather than consuming it.
    ///
    /// This is useful to allow applying stream adaptors while still retaining
    /// ownership of the original stream.
    ///
    /// ```
    /// use futures::prelude::*;
    /// use futures::stream;
    /// use futures::future;
    ///
    /// let mut stream = stream::iter_ok::<_, ()>(1..5);
    ///
    /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait();
    /// assert_eq!(sum, Ok(3));
    ///
    /// // You can use the stream again
    /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait();
    /// assert_eq!(sum, Ok(7));
    /// ```
    fn by_ref(&mut self) -> &mut Self
        where Self: Sized
    {
        self
    }

    /// Catches unwinding panics while polling the stream.
    ///
    /// Caught panic (if any) will be the last element of the resulting stream.
    ///
    /// In general, panics within a stream can propagate all the way out to the
    /// task level. This combinator makes it possible to halt unwinding within
    /// the stream itself. It's most commonly used within task executors. This
    /// method should not be used for error handling.
    ///
    /// Note that this method requires the `UnwindSafe` bound from the standard
    /// library. This isn't always applied automatically, and the standard
    /// library provides an `AssertUnwindSafe` wrapper type to apply it
    /// after-the fact. To assist using this method, the `Stream` trait is also
    /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use futures::prelude::*;
    /// use futures::stream;
    ///
    /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
    /// // panic on second element
    /// let stream_panicking = stream.map(|o| o.unwrap());
    /// let mut iter = stream_panicking.catch_unwind().wait();
    ///
    /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
    /// assert!(iter.next().unwrap().is_err());
    /// assert!(iter.next().is_none());
    /// ```
    #[cfg(feature = "use_std")]
    fn catch_unwind(self) -> CatchUnwind<Self>
        where Self: Sized + std::panic::UnwindSafe
    {
        catch_unwind::new(self)
    }

    /// An adaptor for creating a buffered list of pending futures.
    ///
    /// If this stream's item can be converted into a future, then this adaptor
    /// will buffer up to at most `amt` futures and then return results in the
    /// same order as the underlying stream. No more than `amt` futures will be
    /// buffered at any point in time, and less than `amt` may also be buffered
    /// depending on the state of each future.
    ///
    /// The returned stream will be a stream of each future's result, with
    /// errors passed through whenever they occur.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    #[cfg(feature = "use_std")]
    fn buffered(self, amt: usize) -> Buffered<Self>
        where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
              Self: Sized
    {
        buffered::new(self, amt)
    }

    /// An adaptor for creating a buffered list of pending futures (unordered).
    ///
    /// If this stream's item can be converted into a future, then this adaptor
    /// will buffer up to `amt` futures and then return results in the order
    /// in which they complete. No more than `amt` futures will be buffered at
    /// any point in time, and less than `amt` may also be buffered depending on
    /// the state of each future.
    ///
    /// The returned stream will be a stream of each future's result, with
    /// errors passed through whenever they occur.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    #[cfg(feature = "use_std")]
    fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self>
        where Self::Item: IntoFuture<Error = <Self as Stream>::Error>,
              Self: Sized
    {
        buffer_unordered::new(self, amt)
    }

    /// An adapter for merging the output of two streams.
    ///
    /// The merged stream produces items from one or both of the underlying
    /// streams as they become available. Errors, however, are not merged: you
    /// get at most one error at a time.
    #[deprecated(note = "functionality provided by `select` now")]
    #[allow(deprecated)]
    fn merge<S>(self, other: S) -> Merge<Self, S>
        where S: Stream<Error = Self::Error>,
              Self: Sized,
    {
        merge::new(self, other)
    }

    /// An adapter for zipping two streams together.
    ///
    /// The zipped stream waits for both streams to produce an item, and then
    /// returns that pair. If an error happens, then that error will be returned
    /// immediately. If either stream ends then the zipped stream will also end.
    fn zip<S>(self, other: S) -> Zip<Self, S>
        where S: Stream<Error = Self::Error>,
              Self: Sized,
    {
        zip::new(self, other)
    }

    /// Adapter for chaining two stream.
    ///
    /// The resulting stream emits elements from the first stream, and when
    /// first stream reaches the end, emits the elements from the second stream.
    ///
    /// ```rust
    /// use futures::prelude::*;
    /// use futures::stream;
    ///
    /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]);
    /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]);
    /// let mut chain = stream1.chain(stream2).wait();
    ///
    /// assert_eq!(Some(Ok(10)), chain.next());
    /// assert_eq!(Some(Err(false)), chain.next());
    /// assert_eq!(Some(Err(true)), chain.next());
    /// assert_eq!(Some(Ok(20)), chain.next());
    /// assert_eq!(None, chain.next());
    /// ```
    fn chain<S>(self, other: S) -> Chain<Self, S>
        where S: Stream<Item = Self::Item, Error = Self::Error>,
              Self: Sized
    {
        chain::new(self, other)
    }

    /// Creates a new stream which exposes a `peek` method.
    ///
    /// Calling `peek` returns a reference to the next item in the stream.
    fn peekable(self) -> Peekable<Self>
        where Self: Sized
    {
        peek::new(self)
    }

    /// An adaptor for chunking up items of the stream inside a vector.
    ///
    /// This combinator will attempt to pull items from this stream and buffer
    /// them into a local vector. At most `capacity` items will get buffered
    /// before they're yielded from the returned stream.
    ///
    /// Note that the vectors returned from this iterator may not always have
    /// `capacity` elements. If the underlying stream ended and only a partial
    /// vector was created, it'll be returned. Additionally if an error happens
    /// from the underlying stream then the currently buffered items will be
    /// yielded.
    ///
    /// Errors are passed through the stream unbuffered.
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    ///
    /// # Panics
    ///
    /// This method will panic of `capacity` is zero.
    #[cfg(feature = "use_std")]
    fn chunks(self, capacity: usize) -> Chunks<Self>
        where Self: Sized
    {
        chunks::new(self, capacity)
    }

    /// Creates a stream that selects the next element from either this stream
    /// or the provided one, whichever is ready first.
    ///
    /// This combinator will attempt to pull items from both streams. Each
    /// stream will be polled in a round-robin fashion, and whenever a stream is
    /// ready to yield an item that item is yielded.
    ///
    /// The `select` function is similar to `merge` except that it requires both
    /// streams to have the same item and error types.
    ///
    /// Error are passed through from either stream.
    fn select<S>(self, other: S) -> Select<Self, S>
        where S: Stream<Item = Self::Item, Error = Self::Error>,
              Self: Sized,
    {
        select::new(self, other)
    }

    /// A future that completes after the given stream has been fully processed
    /// into the sink, including flushing.
    ///
    /// This future will drive the stream to keep producing items until it is
    /// exhausted, sending each item to the sink. It will complete once both the
    /// stream is exhausted, and the sink has fully processed received item,
    /// flushed successfully, and closed successfully.
    ///
    /// Doing `stream.forward(sink)` is roughly equivalent to
    /// `sink.send_all(stream)`. The returned future will exhaust all items from
    /// `self`, sending them all to `sink`. Furthermore the `sink` will be
    /// closed and flushed.
    ///
    /// On completion, the pair `(stream, sink)` is returned.
    fn forward<S>(self, sink: S) -> Forward<Self, S>
        where S: Sink<SinkItem = Self::Item>,
              Self::Error: From<S::SinkError>,
              Self: Sized
    {
        forward::new(self, sink)
    }

    /// Splits this `Stream + Sink` object into separate `Stream` and `Sink`
    /// objects.
    ///
    /// This can be useful when you want to split ownership between tasks, or
    /// allow direct interaction between the two objects (e.g. via
    /// `Sink::send_all`).
    ///
    /// This method is only available when the `use_std` feature of this
    /// library is activated, and it is activated by default.
    #[cfg(feature = "use_std")]
    fn split(self) -> (SplitSink<Self>, SplitStream<Self>)
        where Self: super::sink::Sink + Sized
    {
        split::split(self)
    }

    /// Do something with each item of this stream, afterwards passing it on.
    ///
    /// This is similar to the `Iterator::inspect` method in the standard
    /// library where it allows easily inspecting each value as it passes
    /// through the stream, for example to debug what's going on.
    fn inspect<F>(self, f: F) -> Inspect<Self, F>
        where F: FnMut(&Self::Item),
              Self: Sized,
    {
        inspect::new(self, f)
    }

    /// Do something with the error of this stream, afterwards passing it on.
    ///
    /// This is similar to the `Stream::inspect` method where it allows
    /// easily inspecting the error as it passes through the stream, for
    /// example to debug what's going on.
    fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
        where F: FnMut(&Self::Error),
              Self: Sized,
    {
        inspect_err::new(self, f)
    }
}

impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        (**self).poll()
    }
}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
#[cfg(feature = "use_std")]
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
    where I: IntoIterator,
        I::Item: IntoFuture
{
    let mut set = FuturesUnordered::new();

    for future in futures {
        set.push(future.into_future());
    }

    return set
}