pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>Expand description
Create an asynchronous Stream from an asynchronous generator function.
The provided function will be given a Yielder, which, when called, causes the stream to yield an item:
use async_stream_lite::async_stream;
use futures::{pin_mut, stream::StreamExt};
#[tokio::main]
async fn main() {
let stream = async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
});
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}Streams may be returned by using impl Stream<Item = T>:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{Stream, StreamExt}
};
fn zero_to_three() -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
})
}
#[tokio::main]
async fn main() {
let stream = zero_to_three();
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}or with [futures::stream::BoxStream]:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{BoxStream, StreamExt}
};
fn zero_to_three() -> BoxStream<'static, u32> {
Box::pin(async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
}))
}
#[tokio::main]
async fn main() {
let mut stream = zero_to_three();
while let Some(value) = stream.next().await {
println!("{value}");
}
}Streams may also be implemented in terms of other streams:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{Stream, StreamExt}
};
fn zero_to_three() -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
})
}
fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
pin_mut!(input);
while let Some(value) = input.next().await {
yielder.r#yield(value * 2).await;
}
})
}
#[tokio::main]
async fn main() {
let stream = double(zero_to_three());
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}See also try_async_stream, a variant of async_stream which supports try notation (?).