[][src]Trait async_std::stream::Stream

pub trait Stream {
    type Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context
    ) -> Poll<Option<Self::Item>>; fn next(&mut self) -> ImplFuture<Option<Self::Item>>
    where
        Self: Unpin
, { ... }
fn take(self, n: usize) -> Take<Self>
    where
        Self: Sized
, { ... }
fn step_by(self, step: usize) -> StepBy<Self>
    where
        Self: Sized
, { ... }
fn chain<U>(self, other: U) -> Chain<Self, U>
    where
        Self: Sized,
        U: Stream<Item = Self::Item> + Sized
, { ... }
fn enumerate(self) -> Enumerate<Self>
    where
        Self: Sized
, { ... }
fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B>
    where
        Self: Sized,
        F: FnMut(Self::Item) -> B
, { ... }
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item>
    where
        Self: Sized,
        F: FnMut(&Self::Item)
, { ... }
fn fuse(self) -> Fuse<Self>
    where
        Self: Sized
, { ... }
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
    where
        Self: Sized,
        P: FnMut(&Self::Item) -> bool
, { ... }
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
    where
        Self: Sized,
        F: FnMut(Self::Item) -> Option<B>
, { ... }
fn min_by<F>(self, compare: F) -> ImplFuture<Option<Self::Item>>
    where
        Self: Sized,
        F: FnMut(&Self::Item, &Self::Item) -> Ordering
, { ... }
fn nth(&mut self, n: usize) -> ImplFuture<Option<Self::Item>>
    where
        Self: Sized
, { ... }
fn all<F>(&mut self, f: F) -> ImplFuture<bool>
    where
        Self: Unpin + Sized,
        F: FnMut(Self::Item) -> bool
, { ... }
fn find<P>(&mut self, p: P) -> ImplFuture<Option<Self::Item>>
    where
        Self: Sized,
        P: FnMut(&Self::Item) -> bool
, { ... }
fn find_map<F, B>(&mut self, f: F) -> ImplFuture<Option<B>>
    where
        Self: Sized,
        F: FnMut(Self::Item) -> Option<B>
, { ... }
fn fold<B, F>(self, init: B, f: F) -> ImplFuture<B>
    where
        Self: Sized,
        F: FnMut(B, Self::Item) -> B
, { ... }
fn for_each<F>(self, f: F) -> ImplFuture<()>
    where
        Self: Sized,
        F: FnMut(Self::Item)
, { ... }
fn any<F>(&mut self, f: F) -> ImplFuture<bool>
    where
        Self: Unpin + Sized,
        F: FnMut(Self::Item) -> bool
, { ... }
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
    where
        Self: Sized,
        F: FnMut(&mut St, Self::Item) -> Option<B>
, { ... }
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
    where
        Self: Sized,
        P: FnMut(&Self::Item) -> bool
, { ... }
fn skip(self, n: usize) -> Skip<Self>
    where
        Self: Sized
, { ... }
fn try_for_each<F, E>(self, f: F) -> ImplFuture<E>
    where
        Self: Sized,
        F: FnMut(Self::Item) -> Result<(), E>
, { ... }
fn zip<U>(self, other: U) -> Zip<Self, U>
    where
        Self: Sized + Stream,
        U: Stream
, { ... }
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] fn collect<'a, B>(self) -> ImplFuture<'a, B>
    where
        Self: Sized + 'a,
        B: FromStream<Self::Item>
, { ... } }

An asynchronous stream of values.

This trait is a re-export of futures::stream::Stream and is an async version of std::iter::Iterator.

The provided methods do not really exist in the trait itself, but they become available when the prelude is imported:

use async_std::prelude::*;

Associated Types

type Item

The type of items yielded by this stream.

Loading content...

Required methods

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>

Attempts to receive the next item from the stream.

There are several possible return values:

  • Poll::Pending means this stream's next value is not ready yet.
  • Poll::Ready(None) means this stream has been exhausted.
  • Poll::Ready(Some(item)) means item was received out of the stream.

Examples

use std::pin::Pin;

use async_std::prelude::*;
use async_std::stream;
use async_std::task::{Context, Poll};

fn increment(
    s: impl Stream<Item = i32> + Unpin,
) -> impl Stream<Item = i32> + Unpin {
    struct Increment<S>(S);

    impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
        type Item = S::Item;

        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut Context<'_>,
        ) -> Poll<Option<Self::Item>> {
            match Pin::new(&mut self.0).poll_next(cx) {
                Poll::Pending => Poll::Pending,
                Poll::Ready(None) => Poll::Ready(None),
                Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
            }
        }
    }

    Increment(s)
}

let mut s = increment(stream::once(7));

assert_eq!(s.next().await, Some(8));
assert_eq!(s.next().await, None);
Loading content...

Provided methods

fn next(&mut self) -> ImplFuture<Option<Self::Item>> where
    Self: Unpin

Advances the stream and returns the next value.

Returns None when iteration is finished. Individual stream implementations may choose to resume iteration, and so calling next() again may or may not eventually start returning more values.

Examples

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::once(7);

assert_eq!(s.next().await, Some(7));
assert_eq!(s.next().await, None);

fn take(self, n: usize) -> Take<Self> where
    Self: Sized

Creates a stream that yields its first n elements.

Examples

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::repeat(9).take(3);

while let Some(v) = s.next().await {
    assert_eq!(v, 9);
}

fn step_by(self, step: usize) -> StepBy<Self> where
    Self: Sized

Creates a stream that yields each stepth element.

Panics

This method will panic if the given step is 0.

Examples

Basic usage:

use async_std::prelude::*;
use std::collections::VecDeque;

let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
let mut stepped = s.step_by(2);

assert_eq!(stepped.next().await, Some(0));
assert_eq!(stepped.next().await, Some(2));
assert_eq!(stepped.next().await, Some(4));
assert_eq!(stepped.next().await, None);

fn chain<U>(self, other: U) -> Chain<Self, U> where
    Self: Sized,
    U: Stream<Item = Self::Item> + Sized

Takes two streams and creates a new stream over both in sequence.

Examples

Basic usage:

use async_std::prelude::*;
use std::collections::VecDeque;

let first: VecDeque<_> = vec![0u8, 1].into_iter().collect();
let second: VecDeque<_> = vec![2, 3].into_iter().collect();
let mut c = first.chain(second);

assert_eq!(c.next().await, Some(0));
assert_eq!(c.next().await, Some(1));
assert_eq!(c.next().await, Some(2));
assert_eq!(c.next().await, Some(3));
assert_eq!(c.next().await, None);

fn enumerate(self) -> Enumerate<Self> where
    Self: Sized

Creates a stream that gives the current element's count as well as the next value.

Overflow behaviour.

This combinator does no guarding against overflows.

Examples

use async_std::prelude::*;
use std::collections::VecDeque;

let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect();
let mut s = s.enumerate();

assert_eq!(s.next().await, Some((0, 'a')));
assert_eq!(s.next().await, Some((1, 'b')));
assert_eq!(s.next().await, Some((2, 'c')));
assert_eq!(s.next().await, None);

fn map<B, F>(self, f: F) -> Map<Self, F, Self::Item, B> where
    Self: Sized,
    F: FnMut(Self::Item) -> B, 

Takes a closure and creates a stream that calls that closure on every element of this stream.

Examples

use async_std::prelude::*;
use std::collections::VecDeque;

let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect();
let mut s = s.map(|x| 2 * x);

assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, Some(6));
assert_eq!(s.next().await, None);

fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item> where
    Self: Sized,
    F: FnMut(&Self::Item), 

A combinator that does something with each element in the stream, passing the value on.

Examples

Basic usage:

use async_std::prelude::*;
use std::collections::VecDeque;

let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect();
let sum = a
        .inspect(|x| println!("about to filter {}", x))
        .filter(|x| x % 2 == 0)
        .inspect(|x| println!("made it through filter: {}", x))
        .fold(0, |sum, i| sum + i).await;

assert_eq!(sum, 6);

fn fuse(self) -> Fuse<Self> where
    Self: Sized

Transforms this Stream into a "fused" Stream such that after the first time poll returns Poll::Ready(None), all future calls to poll will also return Poll::Ready(None).

Examples

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::once(1).fuse();
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, None);
assert_eq!(s.next().await, None);

fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item> where
    Self: Sized,
    P: FnMut(&Self::Item) -> bool

Creates a stream that uses a predicate to determine if an element should be yielded.

Examples

Basic usage:

use std::collections::VecDeque;

use async_std::prelude::*;

let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
let mut s = s.filter(|i| i % 2 == 0);

assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, None);

fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B> where
    Self: Sized,
    F: FnMut(Self::Item) -> Option<B>, 

Both filters and maps a stream.

Examples

Basic usage:

use std::collections::VecDeque;

use async_std::prelude::*;

let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect();

let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());

let one = parsed.next().await;
assert_eq!(one, Some(1));

let three = parsed.next().await;
assert_eq!(three, Some(3));

let five = parsed.next().await;
assert_eq!(five, Some(5));

let end = parsed.next().await;
assert_eq!(end, None);

fn min_by<F>(self, compare: F) -> ImplFuture<Option<Self::Item>> where
    Self: Sized,
    F: FnMut(&Self::Item, &Self::Item) -> Ordering

Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum, the first element is returned. If the stream is empty, None is returned.

Examples

use std::collections::VecDeque;

use async_std::prelude::*;

let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();

let min = s.clone().min_by(|x, y| x.cmp(y)).await;
assert_eq!(min, Some(1));

let min = s.min_by(|x, y| y.cmp(x)).await;
assert_eq!(min, Some(3));

let min = VecDeque::<usize>::new().min_by(|x, y| x.cmp(y)).await;
assert_eq!(min, None);

fn nth(&mut self, n: usize) -> ImplFuture<Option<Self::Item>> where
    Self: Sized

Returns the nth element of the stream.

Examples

Basic usage:

use std::collections::VecDeque;

use async_std::prelude::*;

let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();

let second = s.nth(1).await;
assert_eq!(second, Some(2));

Calling nth() multiple times:

use std::collections::VecDeque;

use async_std::prelude::*;

let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();

let second = s.nth(0).await;
assert_eq!(second, Some(1));

let second = s.nth(0).await;
assert_eq!(second, Some(2));

Returning None if the stream finished before returning n elements:

use std::collections::VecDeque;

use async_std::prelude::*;

let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();

let fourth = s.nth(4).await;
assert_eq!(fourth, None);

fn all<F>(&mut self, f: F) -> ImplFuture<bool> where
    Self: Unpin + Sized,
    F: FnMut(Self::Item) -> bool

Tests if every element of the stream matches a predicate.

all() takes a closure that returns true or false. It applies this closure to each element of the stream, and if they all return true, then so does all(). If any of them return false, it returns false.

all() is short-circuiting; in other words, it will stop processing as soon as it finds a false, given that no matter what else happens, the result will also be false.

An empty stream returns true.

Examples

Basic usage:

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::repeat::<u32>(42).take(3);
assert!(s.all(|x| x ==  42).await);

Empty stream:

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::empty::<u32>();
assert!(s.all(|_| false).await);

fn find<P>(&mut self, p: P) -> ImplFuture<Option<Self::Item>> where
    Self: Sized,
    P: FnMut(&Self::Item) -> bool

Searches for an element in a stream that satisfies a predicate.

Examples

Basic usage:

use async_std::prelude::*;
use std::collections::VecDeque;

let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let res = s.find(|x| *x == 2).await;
assert_eq!(res, Some(2));

Resuming after a first find:

use async_std::prelude::*;
use std::collections::VecDeque;

let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let res = s.find(|x| *x == 2).await;
assert_eq!(res, Some(2));

let next = s.next().await;
assert_eq!(next, Some(3));

fn find_map<F, B>(&mut self, f: F) -> ImplFuture<Option<B>> where
    Self: Sized,
    F: FnMut(Self::Item) -> Option<B>, 

Applies function to the elements of stream and returns the first non-none result.

use async_std::prelude::*;
use std::collections::VecDeque;

let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect();
let first_number = s.find_map(|s| s.parse().ok()).await;

assert_eq!(first_number, Some(2));

fn fold<B, F>(self, init: B, f: F) -> ImplFuture<B> where
    Self: Sized,
    F: FnMut(B, Self::Item) -> B, 

A combinator that applies a function to every element in a stream producing a single, final value.

Examples

Basic usage:

use async_std::prelude::*;
use std::collections::VecDeque;

let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let sum = s.fold(0, |acc, x| acc + x).await;

assert_eq!(sum, 6);

fn for_each<F>(self, f: F) -> ImplFuture<()> where
    Self: Sized,
    F: FnMut(Self::Item), 

Call a closure on each element of the stream.

Examples

use async_std::prelude::*;
use std::collections::VecDeque;
use std::sync::mpsc::channel;

let (tx, rx) = channel();

let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;

let v: Vec<_> = rx.iter().collect();

assert_eq!(v, vec![1, 2, 3]);

fn any<F>(&mut self, f: F) -> ImplFuture<bool> where
    Self: Unpin + Sized,
    F: FnMut(Self::Item) -> bool

Tests if any element of the stream matches a predicate.

any() takes a closure that returns true or false. It applies this closure to each element of the stream, and if any of them return true, then so does any(). If they all return false, it returns false.

any() is short-circuiting; in other words, it will stop processing as soon as it finds a true, given that no matter what else happens, the result will also be true.

An empty stream returns false.

Examples

Basic usage:

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::repeat::<u32>(42).take(3);
assert!(s.any(|x| x ==  42).await);

Empty stream:

use async_std::prelude::*;
use async_std::stream;

let mut s = stream::empty::<u32>();
assert!(!s.any(|_| false).await);

fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> where
    Self: Sized,
    F: FnMut(&mut St, Self::Item) -> Option<B>, 

A stream adaptor similar to fold that holds internal state and produces a new stream.

scan() takes two arguments: an initial value which seeds the internal state, and a closure with two arguments, the first being a mutable reference to the internal state and the second a stream element. The closure can assign to the internal state to share state between iterations.

On iteration, the closure will be applied to each element of the stream and the return value from the closure, an Option, is yielded by the stream.

Examples

use std::collections::VecDeque;

use async_std::prelude::*;

let s: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
let mut s = s.scan(1, |state, x| {
    *state = *state * x;
    Some(-*state)
});

assert_eq!(s.next().await, Some(-1));
assert_eq!(s.next().await, Some(-2));
assert_eq!(s.next().await, Some(-6));
assert_eq!(s.next().await, None);

fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item> where
    Self: Sized,
    P: FnMut(&Self::Item) -> bool

Combinator that skips elements based on a predicate.

Takes a closure argument. It will call this closure on every element in the stream and ignore elements until it returns false.

After false is returned, SkipWhile's job is over and all further elements in the strem are yielded.

Examples

use std::collections::VecDeque;

use async_std::prelude::*;

let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect();
let mut s = a.skip_while(|x| x.is_negative());

assert_eq!(s.next().await, Some(0));
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, None);

fn skip(self, n: usize) -> Skip<Self> where
    Self: Sized

Creates a combinator that skips the first n elements.

Examples

use std::collections::VecDeque;

use async_std::prelude::*;

let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut skipped = s.skip(2);

assert_eq!(skipped.next().await, Some(3));
assert_eq!(skipped.next().await, None);

fn try_for_each<F, E>(self, f: F) -> ImplFuture<E> where
    Self: Sized,
    F: FnMut(Self::Item) -> Result<(), E>, 

Applies a falliable function to each element in a stream, stopping at first error and returning it.

Examples

use std::collections::VecDeque;
use std::sync::mpsc::channel;
use async_std::prelude::*;

let (tx, rx) = channel();

let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = s.try_for_each(|v| {
    if v % 2 == 1 {
        tx.clone().send(v).unwrap();
        Ok(())
    } else {
        Err("even")
    }
});

let res = s.await;
drop(tx);
let values: Vec<_> = rx.iter().collect();

assert_eq!(values, vec![1]);
assert_eq!(res, Err("even"));

fn zip<U>(self, other: U) -> Zip<Self, U> where
    Self: Sized + Stream,
    U: Stream

'Zips up' two streams into a single stream of pairs.

zip() returns a new stream that will iterate over two other streams, returning a tuple where the first element comes from the first stream, and the second element comes from the second stream.

In other words, it zips two streams together, into a single one.

If either stream returns None, poll_next from the zipped stream will return None. If the first stream returns None, zip will short-circuit and poll_next will not be called on the second stream.

Examples

use std::collections::VecDeque;

use async_std::prelude::*;

let l: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
let r: VecDeque<isize> = vec![4, 5, 6, 7].into_iter().collect();
let mut s = l.zip(r);

assert_eq!(s.next().await, Some((1, 4)));
assert_eq!(s.next().await, Some((2, 5)));
assert_eq!(s.next().await, Some((3, 6)));
assert_eq!(s.next().await, None);

#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] fn collect<'a, B>(self) -> ImplFuture<'a, B> where
    Self: Sized + 'a,
    B: FromStream<Self::Item>, 

This is supported on unstable only.

Transforms a stream into a collection.

collect() can take anything streamable, and turn it into a relevant collection. This is one of the more powerful methods in the async standard library, used in a variety of contexts.

The most basic pattern in which collect() is used is to turn one collection into another. You take a collection, call stream on it, do a bunch of transformations, and then collect() at the end.

Because collect() is so general, it can cause problems with type inference. As such, collect() is one of the few times you'll see the syntax affectionately known as the 'turbofish': ::<>. This helps the inference algorithm understand specifically which collection you're trying to collect into.

Examples

use async_std::prelude::*;
use async_std::stream;

let s = stream::repeat(9u8).take(3);
let buf: Vec<u8> = s.collect().await;

assert_eq!(buf, vec![9; 3]);

// You can also collect streams of Result values
// into any collection that implements FromStream
let s = stream::repeat(Ok(9)).take(3);
// We are using Vec here, but other collections
// are supported as well
let buf: Result<Vec<u8>, ()> = s.collect().await;

assert_eq!(buf, Ok(vec![9; 3]));

// The stream will stop on the first Err and
// return that instead
let s = stream::repeat(Err(5)).take(3);
let buf: Result<Vec<u8>, u8> = s.collect().await;

assert_eq!(buf, Err(5));
Loading content...

Implementations on Foreign Types

impl<S: Stream + Unpin + ?Sized> Stream for Box<S>[src]

type Item = S::Item

impl<'_, S: Stream + Unpin + ?Sized> Stream for &'_ mut S[src]

type Item = S::Item

impl<T: Unpin> Stream for VecDeque<T>[src]

type Item = T

impl<S: Stream> Stream for AssertUnwindSafe<S>[src]

type Item = S::Item

Loading content...

Implementors

impl Stream for ReadDir[src]

type Item = Result<DirEntry>

impl<'_> Stream for async_std::os::unix::net::Incoming<'_>[src]

type Item = Result<UnixStream>

impl<'a> Stream for async_std::net::Incoming<'a>[src]

type Item = Result<TcpStream>

impl<A: Stream, B: Stream> Stream for Zip<A, B>[src]

type Item = (A::Item, B::Item)

impl<P> Stream for Pin<P> where
    P: DerefMut + Unpin,
    <P as Deref>::Target: Stream
[src]

type Item = <<P as Deref>::Target as Stream>::Item

impl<R: BufRead> Stream for Lines<R>[src]

type Item = Result<String>

impl<S> Stream for Skip<S> where
    S: Stream
[src]

type Item = S::Item

impl<S> Stream for StepBy<S> where
    S: Stream
[src]

type Item = S::Item

impl<S, F> Stream for Inspect<S, F, S::Item> where
    S: Stream,
    F: FnMut(&S::Item), 
[src]

type Item = S::Item

impl<S, P> Stream for Filter<S, P, S::Item> where
    S: Stream,
    P: FnMut(&S::Item) -> bool
[src]

type Item = S::Item

impl<S, P> Stream for SkipWhile<S, P, S::Item> where
    S: Stream,
    P: FnMut(&S::Item) -> bool
[src]

type Item = S::Item

impl<S, St, F, B> Stream for Scan<S, St, F> where
    S: Stream,
    F: FnMut(&mut St, S::Item) -> Option<B>, 
[src]

type Item = B

impl<S: Stream> Stream for Fuse<S>[src]

type Item = S::Item

impl<S: Stream> Stream for Take<S>[src]

type Item = S::Item

impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U>[src]

type Item = S::Item

impl<T> Stream for Empty<T>[src]

type Item = T

impl<T: Clone> Stream for Repeat<T>[src]

type Item = T

impl<T: Unpin> Stream for Once<T>[src]

type Item = T

Loading content...