pub trait ParStreamExt{
Show 14 methods
// Required methods
fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
where B: Into<BufSize>;
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
where B: Into<BufSize>,
T: Send,
F: 'static + Send + FnMut(Self::Item) -> T;
fn pull_routing<B, K, Q, F>(
self,
buf_size: B,
key_fn: F,
) -> PullBuilder<Self, K, F, Q>
where Self: 'static + Send + Stream,
Self::Item: 'static + Send,
F: 'static + Send + FnMut(&Self::Item) -> Q,
K: 'static + Send + Hash + Eq + Borrow<Q>,
Q: Send + Hash + Eq,
B: Into<BufSize>;
fn par_builder(self) -> ParBuilder<Self>;
fn par_batching<T, P, F, Fut>(
self,
params: P,
f: F,
) -> RecvStream<'static, T>
where Self: Sized,
F: 'static + Send + Clone + FnMut(usize, Receiver<Self::Item>) -> Fut,
Fut: 'static + Future<Output = Option<(T, Receiver<Self::Item>)>> + Send,
T: 'static + Send,
P: Into<ParParams>;
fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
where Self::Item: Clone,
B: Into<BufSize>;
fn broadcast<B>(
self,
buf_size: B,
send_all: bool,
) -> BroadcastBuilder<Self::Item>
where Self::Item: Clone,
B: Into<BufSize>;
fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
where T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>;
fn par_then_unordered<T, P, F, Fut>(
self,
params: P,
f: F,
) -> RecvStream<'static, T>
where T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>;
fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
where T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>;
fn par_map_unordered<T, P, F, Func>(
self,
params: P,
f: F,
) -> RecvStream<'static, T>
where T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>;
fn par_reduce<P, F, Fut>(
self,
params: P,
reduce_fn: F,
) -> BoxFuture<'static, Option<Self::Item>>
where P: Into<ParParams>,
F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
Fut: 'static + Future<Output = Self::Item> + Send;
fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
where F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = ()> + Send,
P: Into<ParParams>;
fn par_for_each_blocking<P, F, Func>(
self,
params: P,
f: F,
) -> BoxFuture<'static, ()>
where F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() + Send,
P: Into<ParParams>;
}
Expand description
The trait extends Stream types with parallel processing combinators.
Required Methods§
Sourcefn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
Moves the stream to a spawned worker and forwards stream items to a channel with buf_size
.
It returns a receiver stream that buffers the items. The receiver stream is cloneable so that items are sent in anycast manner.
This combinator is similar to shared().
The difference is that spawned()
spawns a worker that actively forwards stream
items to the channel, and the receivers shares the channel. The shared()
combinator
directly poll the underlying stream whenever a receiver polls in lock-free manner.
The choice of these combinator depends on the performance considerations.
use futures::prelude::*;
use par_stream::prelude::*;
// Creates two sharing handles to the stream
let stream = stream::iter(0..100);
let recv1 = stream.spawned(None); // spawn with default buffer size
let recv2 = recv1.clone(); // creates the second receiver
// Consumes the shared streams individually
let collect1 = par_stream::rt::spawn(recv1.collect());
let collect2 = par_stream::rt::spawn(recv2.collect());
let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);
// Checks that the combined values of two vecs are equal to original values
let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
all_vec.sort();
itertools::assert_equal(all_vec, 0..100);
Sourcefn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
Maps this stream’s items to a different type on an blocking thread.
The combinator iteratively maps the stream items and places the output
items to a channel with buf_size
. The function f
is executed on a
separate blocking thread to prevent from blocking the asynchronous runtime.
use futures::{prelude::*, stream};
use par_stream::prelude::*;
let vec: Vec<_> = stream::iter(0..100)
.map_blocking(None, |_| {
// runs a CPU-bounded work here
(0..1000).sum::<u64>()
})
.collect()
.await;
Sourcefn pull_routing<B, K, Q, F>(
self,
buf_size: B,
key_fn: F,
) -> PullBuilder<Self, K, F, Q>
fn pull_routing<B, K, Q, F>( self, buf_size: B, key_fn: F, ) -> PullBuilder<Self, K, F, Q>
Creates a builder that routes each input item according to key_fn
to a destination receiver.
Call builder.register("key")
to obtain the receiving stream for that key.
The builder must be finished by builder.build()
so that receivers start
consuming items. builder.build()
also returns a special leaking receiver
for items which key is not registered or target receiver is closed. Dropping the builder without
builder.build()
will cause receivers to get empty input.
Sourcefn par_builder(self) -> ParBuilder<Self>
fn par_builder(self) -> ParBuilder<Self>
Creates a builder that setups parallel tasks.
Sourcefn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
fn par_batching<T, P, F, Fut>(self, params: P, f: F) -> RecvStream<'static, T>
The combinator maintains a collection of concurrent workers, each consuming as many elements as it likes, for each output element.
use futures::prelude::*;
use par_stream::prelude::*;
let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
stream::iter(data).par_batching(None, |_worker_index, rx| async move {
while let Ok(value) = rx.recv_async().await {
if value > 0 {
return Some((value, rx));
}
}
None
});
Sourcefn tee<B>(self, buf_size: B) -> Tee<Self::Item>
fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
Converts the stream to cloneable receivers, each receiving a copy for each input item.
It spawns a task to consume the stream, and forwards item copies to receivers.
The buf_size
sets the interal channel size. Dropping a receiver does not cause another
receiver to stop.
Receivers are not guaranteed to get the same initial item due to the time difference among receiver creation time. Use broadcast() instead if you need this guarantee.
use futures::{join, prelude::*};
use par_stream::prelude::*;
let orig: Vec<_> = (0..1000).collect();
let rx1 = stream::iter(orig.clone()).tee(1);
let rx2 = rx1.clone();
let rx3 = rx1.clone();
let fut1 = rx1.map(|val| val).collect();
let fut2 = rx2.map(|val| val * 2).collect();
let fut3 = rx3.map(|val| val * 3).collect();
let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
Sourcefn broadcast<B>(
self,
buf_size: B,
send_all: bool,
) -> BroadcastBuilder<Self::Item>
fn broadcast<B>( self, buf_size: B, send_all: bool, ) -> BroadcastBuilder<Self::Item>
Creates a builder to register broadcast receivers.
Call builder.register() to create a receiver. Once the registration is done. builder.build() must be called so that receivers start comsuming item copies. If the builder is droppped without build, receivers get empty input.
Each receiver maintains an internal buffer of buf_size
. The send_all
configures
the behavior if any one of receiver closes. If send_all
is true, closing of one receiver
casues the other receivers to stop, otherwise it does not.
use futures::{join, prelude::*};
use par_stream::prelude::*;
let mut builder = stream::iter(0..).broadcast(2, true);
let rx1 = builder.register();
let rx2 = builder.register();
builder.build();
let (ret1, ret2): (Vec<_>, Vec<_>) = join!(rx1.take(100).collect(), rx2.take(100).collect());
let expect: Vec<_> = (0..100).collect();
assert_eq!(ret1, expect);
assert_eq!(ret2, expect);
Sourcefn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
Runs an asynchronous task on parallel workers and produces items respecting the input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
Output items are gathered to a channel and are reordered respecting to input order.
use futures::prelude::*;
use par_stream::prelude::*;
let doubled: Vec<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_then(None, move |value| async move { value * 2 })
// the collected values will be ordered
.collect()
.await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Sourcefn par_then_unordered<T, P, F, Fut>(
self,
params: P,
f: F,
) -> RecvStream<'static, T>
fn par_then_unordered<T, P, F, Fut>( self, params: P, f: F, ) -> RecvStream<'static, T>
Runs an asynchronous task on parallel workers and produces items without respecting input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
The worker forwards the output to a channel as soon as it finishes.
use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;
let doubled: HashSet<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_then_unordered(None, move |value| {
// the future is sent to a parallel worker
async move { value * 2 }
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Sourcefn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
Runs a blocking task on parallel workers and produces items respecting the input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
Output items are gathered to a channel and are reordered respecting to input order.
use futures::prelude::*;
use par_stream::prelude::*;
// the variable will be shared by parallel workers
let doubled: Vec<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_map(None, move |value| {
// the closure is sent to parallel worker
move || value * 2
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Sourcefn par_map_unordered<T, P, F, Func>(
self,
params: P,
f: F,
) -> RecvStream<'static, T>
fn par_map_unordered<T, P, F, Func>( self, params: P, f: F, ) -> RecvStream<'static, T>
Runs a blocking task on parallel workers and produces items without respecting input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
The worker forwards the output to a channel as soon as it finishes.
use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;
// the variable will be shared by parallel workers
let doubled: HashSet<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_map_unordered(None, move |value| {
// the closure is sent to parallel worker
move || value * 2
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Sourcefn par_reduce<P, F, Fut>(
self,
params: P,
reduce_fn: F,
) -> BoxFuture<'static, Option<Self::Item>>
fn par_reduce<P, F, Fut>( self, params: P, reduce_fn: F, ) -> BoxFuture<'static, Option<Self::Item>>
Reduces the input stream into a single value in parallel.
It maintains a parallel worker pool of num_workers
. Each worker reduces
the input items from the stream into a single value. Once all parallel worker
finish, the values from each worker are reduced into one in treefold manner.
use futures::prelude::*;
use par_stream::prelude::*;
// the variable will be shared by parallel workers
let sum = stream::iter(1..=1000)
// sum up the values in parallel
.par_reduce(None, move |lhs, rhs| {
// the closure is sent to parallel worker
async move { lhs + rhs }
})
.await;
assert_eq!(sum, Some((1 + 1000) * 1000 / 2));
Sourcefn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
Runs an asynchronous task on parallel workers.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.