[−][src]Crate futures_async_stream
Async stream API experiment that may be introduced as a language feature in the future.
This crate provides useful features for streams, using async_await
and unstable generators
.
#[for_await]
Processes streams using a for loop.
This is a reimplement of futures-await's #[async]
for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
#![feature(stmt_expr_attributes, proc_macro_hygiene)] use futures::stream::Stream; use futures_async_stream::for_await; async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> { let mut vec = Vec::new(); #[for_await] for value in stream { vec.push(value); } vec }
value
has the Item
type of the stream passed in. Note that async for loops can only be used inside of async
functions, closures, blocks, #[async_stream]
functions and async_stream_block!
macros.
#[async_stream]
Creates streams via generators.
This is a reimplement of futures-await's #[async_stream]
for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.
#![feature(generators)] use futures::stream::Stream; use futures_async_stream::async_stream; // Returns a stream of i32 #[async_stream(item = i32)] async fn foo(stream: impl Stream<Item = String>) { // `for_await` is built into `async_stream`. If you use `for_await` only in `async_stream`, there is no need to import `for_await`. #[for_await] for x in stream { yield x.parse().unwrap(); } }
#[async_stream]
must have an item type specified via item = some::Path
and the values output from the stream must be yielded via the yield
expression.
async_stream_block!
You can create a stream directly as an expression using an async_stream_block!
macro:
#![feature(generators, proc_macro_hygiene)] use futures::stream::Stream; use futures_async_stream::async_stream_block; fn foo() -> impl Stream<Item = i32> { async_stream_block! { for i in 0..10 { yield i; } } }
Using async stream functions in traits
You can use async stream functions in traits by passing boxed
or boxed_local
as an argument.
#![feature(generators)] use futures_async_stream::async_stream; trait Foo { #[async_stream(boxed, item = u32)] async fn method(&mut self); } struct Bar(u32); impl Foo for Bar { #[async_stream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::max_value() { self.0 += 1; yield self.0; } } }
A async stream function that received a boxed
argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>
.
If you passed boxed_local
instead of boxed
, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>
).
#![feature(generators)] use futures::stream::Stream; use futures_async_stream::async_stream; use std::pin::Pin; // The trait itself can be defined without unstable features. trait Foo { fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>; } struct Bar(u32); impl Foo for Bar { #[async_stream(boxed, item = u32)] async fn method(&mut self) { while self.0 < u32::max_value() { self.0 += 1; yield self.0; } } }
#[async_try_stream] and async_try_stream_block!
?
operator can be used with the #[async_try_stream]
and async_try_stream_block!
. The Item
of the returned stream is Result
with Ok
being the value yielded and Err
the error type returned by ?
operator or return Err(...)
.
#![feature(generators)] use futures::stream::Stream; use futures_async_stream::async_try_stream; #[async_try_stream(ok = i32, error = Box<dyn std::error::Error + Send + Sync>)] async fn foo(stream: impl Stream<Item = String>) { #[for_await] for x in stream { yield x.parse()?; } }
How to write the equivalent code without this API?
#[for_await]
You can write this by combining while let
loop, .await
, pin_mut
macro, and StreamExt::next()
method:
use futures::{ pin_mut, stream::{Stream, StreamExt}, }; async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> { let mut vec = Vec::new(); pin_mut!(stream); while let Some(value) = stream.next().await { vec.push(value); } vec }
#[async_stream]
You can write this by manually implementing the combinator:
use futures::{ ready, stream::Stream, task::{Context, Poll}, }; use pin_project::pin_project; use std::pin::Pin; fn foo<S>(stream: S) -> impl Stream<Item = i32> where S: Stream<Item = String>, { Foo { stream } } #[pin_project] struct Foo<S> { #[pin] stream: S, } impl<S> Stream for Foo<S> where S: Stream<Item = String>, { type Item = i32; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { if let Some(x) = ready!(self.project().stream.poll_next(cx)) { Poll::Ready(Some(x.parse().unwrap())) } else { Poll::Ready(None) } } }
Macros
async_stream_block | Creates streams via generators. |
async_try_stream_block | Creates streams via generators. |
Attribute Macros
async_stream | Creates streams via generators. |
async_try_stream | Creates streams via generators. |
for_await | Processes streams using a for loop. |