[−][src]Trait async_std::stream::Stream
An asynchronous stream of values.
This trait is an async version of std::iter::Iterator
.
While it is currently not possible to implement this trait directly, it gets implemented
automatically for all types that implement futures::stream::Stream
.
Associated Types
type Item
The type of items yielded by this stream.
Required methods
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>
Attempts to receive the next item from the stream.
There are several possible return values:
Poll::Pending
means this stream's next value is not ready yet.Poll::Ready(None)
means this stream has been exhausted.Poll::Ready(Some(item))
meansitem
was received out of the stream.
Examples
use std::pin::Pin; use async_std::prelude::*; use async_std::stream; use async_std::task::{Context, Poll}; fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin { struct Increment<S>(S); impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> { type Item = S::Item; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { match Pin::new(&mut self.0).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), } } } Increment(s) } let mut s = increment(stream::once(7)); assert_eq!(s.next().await, Some(8)); assert_eq!(s.next().await, None);
Provided methods
fn next(&mut self) -> ImplFuture<Option<Self::Item>> where
Self: Unpin,
Self: Unpin,
Advances the stream and returns the next value.
Returns None
when iteration is finished. Individual stream implementations may
choose to resume iteration, and so calling next()
again may or may not eventually
start returning more values.
Examples
use async_std::prelude::*; use async_std::stream; let mut s = stream::once(7); assert_eq!(s.next().await, Some(7)); assert_eq!(s.next().await, None);
fn take(self, n: usize) -> Take<Self> where
Self: Sized,
Self: Sized,
Creates a stream that yields its first n
elements.
Examples
use async_std::prelude::*; use async_std::stream; let mut s = stream::repeat(9).take(3); while let Some(v) = s.next().await { assert_eq!(v, 9); }
fn enumerate(self) -> Enumerate<Self> where
Self: Sized,
Self: Sized,
Creates a stream that gives the current element's count as well as the next value.
Overflow behaviour.
This combinator does no guarding against overflows.
Examples
use async_std::prelude::*; use std::collections::VecDeque; let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect(); let mut s = s.enumerate(); assert_eq!(s.next().await, Some((0, 'a'))); assert_eq!(s.next().await, Some((1, 'b'))); assert_eq!(s.next().await, Some((2, 'c'))); assert_eq!(s.next().await, None);
fn fuse(self) -> Fuse<Self> where
Self: Sized,
Self: Sized,
Transforms this Stream
into a "fused" Stream
such that after the first time poll
returns Poll::Ready(None)
, all future calls to poll
will also return
Poll::Ready(None)
.
Examples
use async_std::prelude::*; use async_std::stream; let mut s = stream::once(1).fuse(); assert_eq!(s.next().await, Some(1)); assert_eq!(s.next().await, None); assert_eq!(s.next().await, None);
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B> where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
Both filters and maps a stream.
Examples
Basic usage:
use std::collections::VecDeque; use async_std::stream::Stream; let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); let mut parsed = s.filter_map(|a| a.parse::<u32>().ok()); let one = parsed.next().await; assert_eq!(one, Some(1)); let three = parsed.next().await; assert_eq!(three, Some(3)); let five = parsed.next().await; assert_eq!(five, Some(5)); let end = parsed.next().await; assert_eq!(end, None);
fn min_by<F>(self, compare: F) -> MinByFuture<Self, F, Self::Item> where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
Returns the element that gives the minimum value with respect to the
specified comparison function. If several elements are equally minimum,
the first element is returned. If the stream is empty, None
is returned.
Examples
use std::collections::VecDeque; use async_std::stream::Stream; let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let min = Stream::min_by(s.clone(), |x, y| x.cmp(y)).await; assert_eq!(min, Some(1)); let min = Stream::min_by(s, |x, y| y.cmp(x)).await; assert_eq!(min, Some(3)); let min = Stream::min_by(VecDeque::<usize>::new(), |x, y| x.cmp(y)).await; assert_eq!(min, None);
fn nth(&mut self, n: usize) -> ImplFuture<Option<Self::Item>> where
Self: Sized,
Self: Sized,
Returns the nth element of the stream.
Examples
Basic usage:
use std::collections::VecDeque; use async_std::stream::Stream; let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let second = s.nth(1).await; assert_eq!(second, Some(2));
Calling nth()
multiple times:
use std::collections::VecDeque; use async_std::stream::Stream; let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let second = s.nth(0).await; assert_eq!(second, Some(1)); let second = s.nth(0).await; assert_eq!(second, Some(2));
Returning None
if the stream finished before returning n
elements:
use std::collections::VecDeque; use async_std::stream::Stream; let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let fourth = s.nth(4).await; assert_eq!(fourth, None);
fn all<F>(&mut self, f: F) -> ImplFuture<bool> where
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
Tests if every element of the stream matches a predicate.
all()
takes a closure that returns true
or false
. It applies
this closure to each element of the stream, and if they all return
true
, then so does all()
. If any of them return false
, it
returns false
.
all()
is short-circuiting; in other words, it will stop processing
as soon as it finds a false
, given that no matter what else happens,
the result will also be false
.
An empty stream returns true
.
Examples
Basic usage:
use async_std::prelude::*; use async_std::stream; let mut s = stream::repeat::<u32>(42).take(3); assert!(s.all(|x| x == 42).await);
Empty stream:
use async_std::prelude::*; use async_std::stream; let mut s = stream::empty::<u32>(); assert!(s.all(|_| false).await);
fn find<P>(&mut self, p: P) -> ImplFuture<Option<Self::Item>> where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
Self: Sized,
P: FnMut(&Self::Item) -> bool,
Searches for an element in a stream that satisfies a predicate.
Examples
Basic usage:
use async_std::prelude::*; use std::collections::VecDeque; let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let res = s.find(|x| *x == 2).await; assert_eq!(res, Some(2));
Resuming after a first find:
use async_std::prelude::*; use std::collections::VecDeque; let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let res = s.find(|x| *x == 2).await; assert_eq!(res, Some(2)); let next = s.next().await; assert_eq!(next, Some(3));
fn find_map<F, B>(&mut self, f: F) -> ImplFuture<Option<B>> where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
Applies function to the elements of stream and returns the first non-none result.
use async_std::prelude::*; use std::collections::VecDeque; let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect(); let first_number = s.find_map(|s| s.parse().ok()).await; assert_eq!(first_number, Some(2));
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, F, Self::Item, B> where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
Self: Sized,
F: FnMut(B, Self::Item) -> B,
A combinator that applies a function to every element in a stream producing a single, final value.
Examples
Basic usage:
use async_std::prelude::*; use std::collections::VecDeque; let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); let sum = s.fold(0, |acc, x| acc + x).await; assert_eq!(sum, 6);
fn any<F>(&mut self, f: F) -> ImplFuture<bool> where
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
Tests if any element of the stream matches a predicate.
any()
takes a closure that returns true
or false
. It applies
this closure to each element of the stream, and if any of them return
true
, then so does any()
. If they all return false
, it
returns false
.
any()
is short-circuiting; in other words, it will stop processing
as soon as it finds a true
, given that no matter what else happens,
the result will also be true
.
An empty stream returns false
.
Examples
Basic usage:
use async_std::prelude::*; use async_std::stream; let mut s = stream::repeat::<u32>(42).take(3); assert!(s.any(|x| x == 42).await);
Empty stream:
use async_std::prelude::*; use async_std::stream; let mut s = stream::empty::<u32>(); assert!(!s.any(|_| false).await);
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> where
Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>,
Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>,
A stream adaptor similar to fold
that holds internal state and produces a new stream.
scan()
takes two arguments: an initial value which seeds the internal state, and a
closure with two arguments, the first being a mutable reference to the internal state and
the second a stream element. The closure can assign to the internal state to share state
between iterations.
On iteration, the closure will be applied to each element of the stream and the return
value from the closure, an Option
, is yielded by the stream.
Examples
use std::collections::VecDeque; use async_std::stream::Stream; let s: VecDeque<isize> = vec![1, 2, 3].into_iter().collect(); let mut s = s.scan(1, |state, x| { *state = *state * x; Some(-*state) }); assert_eq!(s.next().await, Some(-1)); assert_eq!(s.next().await, Some(-2)); assert_eq!(s.next().await, Some(-6)); assert_eq!(s.next().await, None);
fn zip<U>(self, other: U) -> Zip<Self, U> where
Self: Sized,
U: Stream,
Self: Sized,
U: Stream,
'Zips up' two streams into a single stream of pairs.
zip()
returns a new stream that will iterate over two other streams, returning a tuple
where the first element comes from the first stream, and the second element comes from the
second stream.
In other words, it zips two streams together, into a single one.
If either stream returns None
, poll_next
from the zipped stream will return
None
. If the first stream returns None
, zip
will short-circuit and poll_next
will not be called on the second stream.
Examples
use std::collections::VecDeque; use async_std::stream::Stream; let l: VecDeque<isize> = vec![1, 2, 3].into_iter().collect(); let r: VecDeque<isize> = vec![4, 5, 6, 7].into_iter().collect(); let mut s = l.zip(r); assert_eq!(s.next().await, Some((1, 4))); assert_eq!(s.next().await, Some((2, 5))); assert_eq!(s.next().await, Some((3, 6))); assert_eq!(s.next().await, None);
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> DynFuture<'a, B> where
Self: Stream + Sized + Send + 'a,
Self::Item: Send,
B: FromStream<Self::Item>,
Self: Stream + Sized + Send + 'a,
Self::Item: Send,
B: FromStream<Self::Item>,
unstable
only.Transforms a stream into a collection.
collect()
can take anything streamable, and turn it into a relevant
collection. This is one of the more powerful methods in the async
standard library, used in a variety of contexts.
The most basic pattern in which collect()
is used is to turn one
collection into another. You take a collection, call stream
on it,
do a bunch of transformations, and then collect()
at the end.
Because collect()
is so general, it can cause problems with type
inference. As such, collect()
is one of the few times you'll see
the syntax affectionately known as the 'turbofish': ::<>
. This
helps the inference algorithm understand specifically which collection
you're trying to collect into.
Examples
use async_std::prelude::*; use async_std::stream; let s = stream::repeat(9u8).take(3); let buf: Vec<u8> = s.collect().await; assert_eq!(buf, vec![9; 3]); // You can also collect streams of Result values // into any collection that implements FromStream let s = stream::repeat(Ok(9)).take(3); // We are using Vec here, but other collections // are supported as well let buf: Result<Vec<u8>, ()> = s.collect().await; assert_eq!(buf, Ok(vec![9; 3])); // The stream will stop on the first Err and // return that instead let s = stream::repeat(Err(5)).take(3); let buf: Result<Vec<u8>, u8> = s.collect().await; assert_eq!(buf, Err(5));