pub fn async_stream<T, F, U>(generator: F) -> AsyncStream<T, U>
Expand description
Create an asynchronous Stream
from an asynchronous generator function.
The provided function will be given a Yielder
, which, when called, causes the stream to yield an item:
use async_stream_lite::async_stream;
use futures::{pin_mut, stream::StreamExt};
#[tokio::main]
async fn main() {
let stream = async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
});
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}
Streams may be returned by using impl Stream<Item = T>
:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{Stream, StreamExt}
};
fn zero_to_three() -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
})
}
#[tokio::main]
async fn main() {
let stream = zero_to_three();
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}
or with [futures::stream::BoxStream
]:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{BoxStream, StreamExt}
};
fn zero_to_three() -> BoxStream<'static, u32> {
Box::pin(async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
}))
}
#[tokio::main]
async fn main() {
let mut stream = zero_to_three();
while let Some(value) = stream.next().await {
println!("{value}");
}
}
Streams may also be implemented in terms of other streams:
use async_stream_lite::async_stream;
use futures::{
pin_mut,
stream::{Stream, StreamExt}
};
fn zero_to_three() -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
for i in 0..3 {
yielder.r#yield(i).await;
}
})
}
fn double<S: Stream<Item = u32>>(input: S) -> impl Stream<Item = u32> {
async_stream(|yielder| async move {
pin_mut!(input);
while let Some(value) = input.next().await {
yielder.r#yield(value * 2).await;
}
})
}
#[tokio::main]
async fn main() {
let stream = double(zero_to_three());
pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("{value}");
}
}
See also try_async_stream
, a variant of async_stream
which supports try notation (?
).