[][src]Crate async_stream

Asynchronous stream of elements.

Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation. The stream! macro works using only #[feature(async_await)].

The stream! macro returns an anonymous type implementing the Stream trait. The Item associated type is the type of the values yielded from the stream. The try_stream! also returns an anonymous type implementing the Stream trait, but the Item associated type is Result<T, Error>. The try_stream! macro supports using ? notiation as part of the implementation.

Usage

A basic stream yielding numbers. Values are yielded using the yield keyword. The stream block must return ().

#![feature(async_await)]

use tokio::prelude::*;

use async_stream::stream;
use futures_util::pin_mut;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..3 {
            yield i;
        }
    };

    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be returned by using impl Stream<Item = T>:

#![feature(async_await)]

use tokio::prelude::*;

use async_stream::stream;
use futures_util::pin_mut;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = zero_to_three();
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be implemented in terms of other streams:

#![feature(async_await)]

use tokio::prelude::*;

use async_stream::stream;
use futures_util::pin_mut;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn double<S: Stream<Item = u32>>(input: S)
    -> impl Stream<Item = u32>
{
    stream! {
        pin_mut!(input);
        while let Some(value) = input.next().await {
            yield value * 2;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = double(zero_to_three());
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Rust try notation (?) can be used with the try_stream! macro. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ?.

#![feature(async_await)]

use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;

use async_stream::try_stream;
use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
    -> impl Stream<Item = io::Result<TcpStream>>
{
    try_stream! {
        let mut listener = TcpListener::bind(&addr)?;

        loop {
            let (stream, addr) = listener.accept().await?;
            println!("received on {:?}", addr);
            yield stream;
        }
    }
}

Implementation

The stream! and try_stream! macros are implemented using proc macros. Given that proc macros in expression position are not supported on stable rust, a hack similar to the one provided by the proc-macro-hack crate is used. The macro searches the syntax tree for instances of sender.send($expr) and transforms them into sender.send($expr).await.

The stream uses a lightweight sender to send values from the stream implementation to the caller. When entering the stream, an Option<T> is stored on the stack. A pointer to the cell is stored in a thread local and poll is called on the async block. When poll returns. sender.send(value) stores the value that cell and yields back to the caller.

Limitations

async-stream suffers from the same limitations as the proc-macro-hack crate. Primarily, nesting support must be implemented using a TT-muncher. If large stream! blocks are used, the caller will be required to add #![recursion_limit = "..."] to their crate.

A stream! macro may only contain up to 64 macro invocations.

Macros

stream

Asynchronous stream

try_stream

Asynchronous fallible stream