[−][src]Crate async_stream
Asynchronous stream of elements.
Provides two macros, stream! and try_stream!, allowing the caller to
define asynchronous streams of elements. These are implemented using async
& await notation. The stream! macro works using only
#[feature(async_await)].
The stream! macro returns an anonymous type implementing the Stream
trait. The Item associated type is the type of the values yielded from the
stream. The try_stream! also returns an anonymous type implementing the
Stream trait, but the Item associated type is Result<T, Error>. The
try_stream! macro supports using ? notiation as part of the
implementation.
Usage
A basic stream yielding numbers. Values are yielded using the yield
keyword. The stream block must return ().
#![feature(async_await)] use tokio::prelude::*; use async_stream::stream; use futures_util::pin_mut; #[tokio::main] async fn main() { let s = stream! { for i in 0..3 { yield i; } }; pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } }
Streams may be returned by using impl Stream<Item = T>:
#![feature(async_await)] use tokio::prelude::*; use async_stream::stream; use futures_util::pin_mut; fn zero_to_three() -> impl Stream<Item = u32> { stream! { for i in 0..3 { yield i; } } } #[tokio::main] async fn main() { let s = zero_to_three(); pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } }
Streams may be implemented in terms of other streams:
#![feature(async_await)] use tokio::prelude::*; use async_stream::stream; use futures_util::pin_mut; fn zero_to_three() -> impl Stream<Item = u32> { stream! { for i in 0..3 { yield i; } } } fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> { stream! { pin_mut!(input); while let Some(value) = input.next().await { yield value * 2; } } } #[tokio::main] async fn main() { let s = double(zero_to_three()); pin_mut!(s); // needed for iteration while let Some(value) = s.next().await { println!("got {}", value); } }
Rust try notation (?) can be used with the try_stream! macro. The Item
of the returned stream is Result with Ok being the value yielded and
Err the error type returned by ?.
#![feature(async_await)] use tokio::net::{TcpListener, TcpStream}; use tokio::prelude::*; use async_stream::try_stream; use std::io; use std::net::SocketAddr; fn bind_and_accept(addr: SocketAddr) -> impl Stream<Item = io::Result<TcpStream>> { try_stream! { let mut listener = TcpListener::bind(&addr)?; loop { let (stream, addr) = listener.accept().await?; println!("received on {:?}", addr); yield stream; } } }
Implementation
The stream! and try_stream! macros are implemented using proc macros.
Given that proc macros in expression position are not supported on stable
rust, a hack similar to the one provided by the proc-macro-hack crate is
used. The macro searches the syntax tree for instances of sender.send($expr) and
transforms them into sender.send($expr).await.
The stream uses a lightweight sender to send values from the stream
implementation to the caller. When entering the stream, an Option<T> is
stored on the stack. A pointer to the cell is stored in a thread local and
poll is called on the async block. When poll returns.
sender.send(value) stores the value that cell and yields back to the
caller.
Limitations
async-stream suffers from the same limitations as the proc-macro-hack
crate. Primarily, nesting support must be implemented using a TT-muncher.
If large stream! blocks are used, the caller will be required to add
#![recursion_limit = "..."] to their crate.
A stream! macro may only contain up to 64 macro invocations.
Macros
| stream | Asynchronous stream |
| try_stream | Asynchronous fallible stream |