Expand description
A version of async-stream without macros.
This crate provides generic implementations of Stream
trait.
Stream
is an asynchronous version of std::iter::Iterator
.
Two functions are provided - fn_stream
and try_fn_stream
.
§Usage
§Basic Usage
If you need to create a stream that may result in error, use try_fn_stream
, otherwise use fn_stream
.
To create a stream:
- Invoke
fn_stream
ortry_fn_stream
, passing a closure (anonymous function). - Closure will accept an
emitter
. To return value from the stream, call.emit(value)
onemitter
and.await
on its result. Once stream consumer has processed the value and called.next()
on stream,.await
will return.
§Returning errors
try_fn_stream
provides some conveniences for returning errors:
- Errors can be return from closure via
return Err(...)
or the question mark (?
) operator. This will end the stream. - An
emitter
also has anemit_err()
method to return errors without ending the stream.
§Limitations
fn_stream
does not support cross-task streams, that is all stream values must be produced in the same task as the stream.
Specifically, it is not supported to move StreamEmitter
to another thread or tokio task.
If your use case necessitates this, consider using async chanells for that.
§Advanced usage
Internal concurrency is supported within fn_stream
(see examples/join.rs).
§Examples
Finite stream of numbers
use async_fn_stream::fn_stream;
use futures_util::Stream;
fn build_stream() -> impl Stream<Item = i32> {
fn_stream(|emitter| async move {
for i in 0..3 {
// yield elements from stream via `emitter`
emitter.emit(i).await;
}
})
}
Read numbers from text file, with error handling
use anyhow::Context;
use async_fn_stream::try_fn_stream;
use futures_util::{pin_mut, Stream, StreamExt};
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
};
fn read_numbers(file_name: String) -> impl Stream<Item = Result<i32, anyhow::Error>> {
try_fn_stream(|emitter| async move {
// Return errors via `?` operator.
let file = BufReader::new(File::open(file_name).await.context("Failed to open file")?);
pin_mut!(file);
let mut line = String::new();
loop {
line.clear();
let byte_count = file
.read_line(&mut line)
.await
.context("Failed to read line")?;
if byte_count == 0 {
break;
}
for token in line.split_ascii_whitespace() {
let Ok(number) = token.parse::<i32>() else {
// Return errors via the `emit_err` method.
emitter.emit_err(
anyhow::anyhow!("Failed to convert string \"{token}\" to number")
).await;
continue;
};
emitter.emit(number).await;
}
}
Ok(())
})
}
§Why not async-stream
?
async-stream is great! It has a nice syntax, but it is based on macros which brings some flaws:
- proc-macros sometimes interact badly with IDEs such as
rust-analyzer
orRustRover
. see e.g. https://github.com/rust-lang/rust-analyzer/issues/11533 - proc-macros may increase build times
Structs§
- Emit
Future - Future returned from
StreamEmitter::emit
. - FnStream
- Implementation of
Stream
trait created byfn_stream
. - Stream
Emitter - An intermediary that transfers values from stream to its consumer
- TryFn
Stream - Implementation of
Stream
trait created bytry_fn_stream
. - TryStream
Emitter - An intermediary that transfers values from stream to its consumer
Functions§
- fn_
stream - Create a new infallible stream which is implemented by
func
. - try_
fn_ stream - Create a new fallible stream which is implemented by
func
.