AsyncPipeline

Struct AsyncPipeline 

Source
pub struct AsyncPipeline<'a, S: Stream + 'a> { /* private fields */ }
Expand description

Like a buffered stream, with the added guarantee that it won’t “snooze” futures.

§Examples

The simplest use case is to create a pipeline from an input iterator (from_iter) or an input stream (from_stream) and then run it with for_each_concurrent. The limit argument gives the maximum number of futures that can run concurrently. Once the limit is reached, the pipeline will wait for in-flight futures to finish before starting more. This example runs the async closure 20 times in two groups of 10:

AsyncPipeline::from_iter(0..20)
    .for_each_concurrent(
        async |i| {
            println!("starting {i}");
            sleep(Duration::from_secs(1)).await;
            println!("finished {i}");
        },
        10,
    )
    .await;

You can also add concurrent map or filter-map stages to the pipeline. All stages run concurrently until the whole pipeline is finished. To preserve the pipeline order, use map_concurrent or filter_map_concurrent. If order doesn’t matter, use map_unordered or filter_map_unordered. Each of these also takes a limit argument. When you don’t want a limit, you can use usize::MAX. This example uses a filter-map stage to extract the even numbers, uses a map stage to multiply them 10, and collects the results into a vector:

let outputs: Vec<u32> = AsyncPipeline::from_iter(0..20)
    .filter_map_concurrent(
        async |i| {
            println!("filter {i}");
            sleep(Duration::from_secs(1)).await;
            (i % 2 == 0).then_some(i)
        },
        10,
    )
    .map_concurrent(
        async |i| {
            println!("multiply {i}");
            sleep(Duration::from_secs(1)).await;
            i * 10
        },
        usize::MAX, // unlimited
    )
    .collect()
    .await;
assert_eq!(outputs, [0, 20, 40, 60, 80, 100, 120, 140, 160, 180]);

The adapt_stream method lets you apply arbitrary stream methods to the output of any stage. This is very flexible, though it doesn’t add concurrency. (Please don’t use the buffered method here, since you might reintroduce the deadlocks that this crate is trying to carefully to avoid.) Here’s an example of using chain to add some extra elements both before and after a stage:

use futures::{StreamExt, stream};

let outputs: Vec<u32> = AsyncPipeline::from_iter([4, 5, 6])
    .map_concurrent(async |i| i * 10, usize::MAX)
    .adapt_stream(|outputs| {
        stream::iter([1, 2, 3])
            .chain(outputs)
            .chain(stream::iter([7, 8, 9]))
    })
    .map_concurrent(async |i| i * 10, usize::MAX)
    .collect()
    .await;
assert_eq!(outputs, [10, 20, 30, 400, 500, 600, 70, 80, 90]);

Implementations§

Source§

impl<'a, I: Iterator> AsyncPipeline<'a, Iter<I>>

Source

pub fn from_iter(iter: impl IntoIterator<IntoIter = I>) -> Self

Source§

impl<'a, S: Stream> AsyncPipeline<'a, S>

Source

pub fn from_stream(stream: S) -> Self

Source

pub async fn for_each(self, f: impl AsyncFnMut(S::Item))

Source

pub async fn for_each_concurrent(self, f: impl AsyncFn(S::Item), limit: usize)

Source

pub async fn collect<C: Default + Extend<S::Item>>(self) -> C

Source

pub fn adapt_stream<F, S2>(self, f: F) -> AsyncPipeline<'a, S2>
where F: FnOnce(S) -> S2, S2: Stream,

Source

pub fn map_concurrent<F, Fut, U>( self, f: F, limit: usize, ) -> AsyncPipeline<'a, impl Stream<Item = U>>
where F: FnMut(S::Item) -> Fut + 'a, Fut: Future<Output = U> + 'a, U: 'a,

Source

pub fn map_unordered<F, Fut, U>( self, f: F, limit: usize, ) -> AsyncPipeline<'a, impl Stream<Item = U>>
where F: FnMut(S::Item) -> Fut + 'a, Fut: Future<Output = U> + 'a, U: 'a,

Source

pub fn filter_map_concurrent<F, Fut, U>( self, f: F, limit: usize, ) -> AsyncPipeline<'a, impl Stream<Item = U>>
where F: FnMut(S::Item) -> Fut + 'a, Fut: Future<Output = Option<U>> + 'a, U: 'a,

Source

pub fn filter_map_unordered<F, Fut, U>( self, f: F, limit: usize, ) -> AsyncPipeline<'a, impl Stream<Item = U>>
where F: FnMut(S::Item) -> Fut + 'a, Fut: Future<Output = Option<U>> + 'a, U: 'a,

Auto Trait Implementations§

§

impl<'a, S> Freeze for AsyncPipeline<'a, S>
where S: Freeze,

§

impl<'a, S> !RefUnwindSafe for AsyncPipeline<'a, S>

§

impl<'a, S> !Send for AsyncPipeline<'a, S>

§

impl<'a, S> !Sync for AsyncPipeline<'a, S>

§

impl<'a, S> Unpin for AsyncPipeline<'a, S>
where S: Unpin,

§

impl<'a, S> !UnwindSafe for AsyncPipeline<'a, S>

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.