Expand description
An adaptor that chunks up completed futures in a stream and flushes them after a timeout or when the buffer is full.
It is based on the Chunks
adaptor of futures-util, to which we added a timeout.
§Usage
Either as a standalone stream operator or directly as a combinator:
use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::ChunksTimeoutStreamExt;
#[tokio::main]
async fn main() {
let results = stream::iter(0..10)
.chunks_timeout(5, Duration::from_secs(10))
.collect::<Vec<_>>();
assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results.await);
}
The above code iterates over a stream and creates chunks of size 5 with a timeout of 10 seconds.
§Try Chunks Timeout
For streams of Result
values, you can use try_chunks_timeout
to batch successful values and immediately propagate errors:
use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::TryChunksTimeoutStreamExt;
#[tokio::main]
async fn main() {
let results = stream::iter((0..10).map(|i| if i == 5 { Err("error") } else { Ok(i) }))
.try_chunks_timeout(3, Duration::from_secs(10))
.collect::<Vec<_>>();
// Results in: [Ok([0, 1, 2]), Ok([3, 4]), Err("error"), Ok([6, 7, 8]), Ok([9])]
}
Structs§
- Chunks
Timeout - A Stream of chunks.
- TryChunks
Timeout - A Stream of chunks for Result streams that batches Ok values and immediately propagates errors.
Traits§
- Chunks
Timeout Stream Ext - A Stream extension trait allowing you to call
chunks_timeout
on anything which implementsStream
. - TryChunks
Timeout Stream Ext - A Try Stream extension trait allowing you to call
try_chunks_timeout
on anything which implementsStream<Item = Result<T, E>>
.