pub struct AsyncPipeline<'a, S: Stream + 'a> { /* private fields */ }Expand description
Like a buffered stream, with the added guarantee that it won’t “snooze” futures.
§Examples
The simplest use case is to create a pipeline from an input iterator
(from_iter) or an input stream
(from_stream) and then run it with
for_each_concurrent. The limit argument gives the
maximum number of futures that can run concurrently. Once the limit is reached, the pipeline
will wait for in-flight futures to finish before starting more. This example runs the async
closure 20 times in two groups of 10:
AsyncPipeline::from_iter(0..20)
.for_each_concurrent(
async |i| {
println!("starting {i}");
sleep(Duration::from_secs(1)).await;
println!("finished {i}");
},
10,
)
.await;You can also add concurrent map or filter-map stages to the pipeline. All stages run
concurrently until the whole pipeline is finished. To preserve the pipeline order, use
map_concurrent or
filter_map_concurrent. If order doesn’t matter, use
map_unordered or
filter_map_unordered. Each of these also takes a
limit argument. When you don’t want a limit, you can use usize::MAX. This example uses a
filter-map stage to extract the even numbers, uses a map stage to multiply them 10, and
collects the results into a vector:
let outputs: Vec<u32> = AsyncPipeline::from_iter(0..20)
.filter_map_concurrent(
async |i| {
println!("filter {i}");
sleep(Duration::from_secs(1)).await;
(i % 2 == 0).then_some(i)
},
10,
)
.map_concurrent(
async |i| {
println!("multiply {i}");
sleep(Duration::from_secs(1)).await;
i * 10
},
usize::MAX, // unlimited
)
.collect()
.await;
assert_eq!(outputs, [0, 20, 40, 60, 80, 100, 120, 140, 160, 180]);The adapt_stream method lets you apply arbitrary stream
methods to the output of any stage. This is very flexible, though it doesn’t add concurrency.
(Please don’t use the buffered method here, since you might reintroduce the deadlocks that
this crate is trying to carefully to avoid.) Here’s an example of using
chain to
add some extra elements both before and after a stage:
use futures::{StreamExt, stream};
let outputs: Vec<u32> = AsyncPipeline::from_iter([4, 5, 6])
.map_concurrent(async |i| i * 10, usize::MAX)
.adapt_stream(|outputs| {
stream::iter([1, 2, 3])
.chain(outputs)
.chain(stream::iter([7, 8, 9]))
})
.map_concurrent(async |i| i * 10, usize::MAX)
.collect()
.await;
assert_eq!(outputs, [10, 20, 30, 400, 500, 600, 70, 80, 90]);