[][src]Crate futures_async_stream

Async stream API experiment that may be introduced as a language feature in the future.

This crate provides useful features for streams, using unstable async_await and generators.

#[for_await]

Processes streams using a for loop.

This is a reimplement of futures-await's #[async] for loops for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

#![feature(async_await, stmt_expr_attributes, proc_macro_hygiene)]
use futures::stream::Stream;
use futures_async_stream::for_await;

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = Vec::new();
    #[for_await]
    for value in stream {
        vec.push(value);
    }
    vec
}

value has the Item type of the stream passed in. Note that async for loops can only be used inside of async functions, closures, blocks, #[async_stream] functions and async_stream_block! macros.

#[async_stream]

Creates streams via generators.

This is a reimplement of futures-await's #[async_stream] for futures 0.3 and is an experimental implementation of the idea listed as the next step of async/await.

#![feature(async_await, generators)]
use futures::stream::Stream;
use futures_async_stream::async_stream;

// Returns a stream of i32
#[async_stream(item = i32)]
async fn foo(stream: impl Stream<Item = String>) {
    // `for_await` is built into `async_stream`. If you use `for_await` only in `async_stream`, there is no need to import `for_await`.
    #[for_await]
    for x in stream {
        yield x.parse().unwrap();
    }
}

#[async_stream] must have an item type specified via item = some::Path and the values output from the stream must be yielded via the yield expression.

async_stream_block!

You can create a stream directly as an expression using an async_stream_block! macro:

#![feature(async_await, generators, proc_macro_hygiene)]
use futures::stream::Stream;
use futures_async_stream::async_stream_block;

fn foo() -> impl Stream<Item = i32> {
    async_stream_block! {
        for i in 0..10 {
            yield i;
        }
    }
}

Using async stream functions in traits

You can use async stream functions in traits by passing boxed or boxed_local as an argument.

#![feature(async_await, generators)]
use futures_async_stream::async_stream;

trait Foo {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self);
}

struct Bar(u32);

impl Foo for Bar {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::max_value() {
            self.0 += 1;
            yield self.0;
        }
    }
}

A async stream function that received a boxed argument is converted to a function that returns Pin<Box<dyn Stream<Item = item> + Send + 'lifetime>>. If you passed boxed_local instead of boxed, async stream function returns a non-threadsafe stream (Pin<Box<dyn Stream<Item = item> + 'lifetime>>).

#![feature(async_await, generators)]
use futures::stream::Stream;
use futures_async_stream::async_stream;
use std::pin::Pin;

// The trait itself can be defined without unstable features.
trait Foo {
    fn method(&mut self) -> Pin<Box<dyn Stream<Item = u32> + Send + '_>>;
}

struct Bar(u32);

impl Foo for Bar {
    #[async_stream(boxed, item = u32)]
    async fn method(&mut self) {
        while self.0 < u32::max_value() {
            self.0 += 1;
            yield self.0;
        }
    }
}

How to write the equivalent code without this API?

#[for_await]

You can write this by combining while let loop, .await, pin_mut macro, and StreamExt::next() method:

#![feature(async_await)]
use futures::{
    pin_mut,
    stream::{Stream, StreamExt},
};

async fn collect(stream: impl Stream<Item = i32>) -> Vec<i32> {
    let mut vec = Vec::new();
    pin_mut!(stream);
    while let Some(value) = stream.next().await {
        vec.push(value);
    }
    vec
}

#[async_stream]

You can write this by manually implementing the combinator:

#![feature(async_await)]
use futures::{
    stream::Stream,
    ready,
    task::{Context, Poll},
};
use pin_project::pin_project;
use std::pin::Pin;

fn foo<S>(stream: S) -> impl Stream<Item = i32>
where
    S: Stream<Item = String>,
{
    Foo { stream }
}

#[pin_project]
struct Foo<S> {
    #[pin]
    stream: S,
}

impl<S> Stream for Foo<S>
where
    S: Stream<Item = String>,
{
    type Item = i32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        if let Some(x) = ready!(this.stream.poll_next(cx)) {
            Poll::Ready(Some(x.parse().unwrap()))
        } else {
            Poll::Ready(None)
        }
    }
}

Re-exports

pub use futures_async_stream_macro::for_await;
pub use futures_async_stream_macro::async_stream;
pub use futures_async_stream_macro::async_stream_block;