Function async_stream

Source
pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
where F: FnOnce(Yielder<T>) -> U, U: Future<Output = ()>,
Expand description

Create an asynchronous Stream from an asynchronous generator function.

The provided function will be given a Yielder, which, when called, causes the stream to yield an item:

use async_stream_lite::async_stream;
use futures::{pin_mut, stream::StreamExt};

#[tokio::main]
async fn main() {
	let stream = async_stream(|yielder| async move {
		for i in 0..3 {
			yielder.r#yield(i).await;
		}
	});
	pin_mut!(stream);
	while let Some(value) = stream.next().await {
		println!("{value}");
	}
}

Streams may be returned by using impl Stream<Item = T>:

use async_stream_lite::async_stream;
use futures::{
	pin_mut,
	stream::{Stream, StreamExt}
};

fn zero_to_three() -> impl Stream<Item = u32> {
	async_stream(|yielder| async move {
		for i in 0..3 {
			yielder.r#yield(i).await;
		}
	})
}

#[tokio::main]
async fn main() {
	let stream = zero_to_three();
	pin_mut!(stream);
	while let Some(value) = stream.next().await {
		println!("{value}");
	}
}

or with [futures::stream::BoxStream]:

use async_stream_lite::async_stream;
use futures::{
	pin_mut,
	stream::{BoxStream, StreamExt}
};

fn zero_to_three() -> BoxStream<'static, u32> {
	Box::pin(async_stream(|yielder| async move {
		for i in 0..3 {
			yielder.r#yield(i).await;
		}
	}))
}

#[tokio::main]
async fn main() {
	let mut stream = zero_to_three();
	while let Some(value) = stream.next().await {
		println!("{value}");
	}
}

Streams may also be implemented in terms of other streams:

use async_stream_lite::async_stream;
use futures::{
	pin_mut,
	stream::{Stream, StreamExt}
};

fn zero_to_three() -> impl Stream<Item = u32> {
	async_stream(|yielder| async move {
		for i in 0..3 {
			yielder.r#yield(i).await;
		}
	})
}

fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
	async_stream(|yielder| async move {
		pin_mut!(input);
		while let Some(value) = input.next().await {
			yielder.r#yield(value * 2).await;
		}
	})
}

#[tokio::main]
async fn main() {
	let stream = double(zero_to_three());
	pin_mut!(stream);
	while let Some(value) = stream.next().await {
		println!("{value}");
	}
}

See also try_async_stream, a variant of async_stream which supports try notation (?).