Trait par_stream::prelude::ParStreamExt
source · [−]pub trait ParStreamExt where
Self: 'static + Send + Stream,
Self::Item: 'static + Send, {
Show 14 methods
fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item>
where
B: Into<BufSize>;
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T>
where
B: Into<BufSize>,
T: Send,
F: 'static + Send + FnMut(Self::Item) -> T;
fn pull_routing<B, K, Q, F>(
self,
buf_size: B,
key_fn: F
) -> PullBuilder<Self, K, F, Q>
where
Self: 'static + Send + Stream,
Self::Item: 'static + Send,
F: 'static + Send + FnMut(&Self::Item) -> Q,
K: 'static + Send + Hash + Eq + Borrow<Q>,
Q: Send + Hash + Eq,
B: Into<BufSize>;
fn par_builder(self) -> ParBuilder<Self>;
fn par_batching<T, P, F, Fut>(
self,
params: P,
f: F
) -> RecvStream<'static, T>
where
Self: Sized,
F: 'static + Send + Clone + FnMut(usize, Receiver<Self::Item>) -> Fut,
Fut: 'static + Future<Output = Option<(T, Receiver<Self::Item>)>> + Send,
T: 'static + Send,
P: Into<ParParams>;
fn tee<B>(self, buf_size: B) -> Tee<Self::Item>
where
Self::Item: Clone,
B: Into<BufSize>;
fn broadcast<B>(
self,
buf_size: B,
send_all: bool
) -> BroadcastBuilder<Self::Item>
where
Self::Item: Clone,
B: Into<BufSize>;
fn par_then<T, P, F, Fut>(self, params: P, f: F) -> ParThen<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>;
fn par_then_unordered<T, P, F, Fut>(
self,
params: P,
f: F
) -> RecvStream<'static, T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>;
fn par_map<T, P, F, Func>(self, params: P, f: F) -> ParMap<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>;
fn par_map_unordered<T, P, F, Func>(
self,
params: P,
f: F
) -> RecvStream<'static, T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>;
fn par_reduce<P, F, Fut>(
self,
params: P,
reduce_fn: F
) -> BoxFuture<'static, Option<Self::Item>>
where
P: Into<ParParams>,
F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send + Clone,
Fut: 'static + Future<Output = Self::Item> + Send;
fn par_for_each<P, F, Fut>(self, params: P, f: F) -> BoxFuture<'static, ()>
where
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = ()> + Send,
P: Into<ParParams>;
fn par_for_each_blocking<P, F, Func>(
self,
params: P,
f: F
) -> BoxFuture<'static, ()>
where
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() + Send,
P: Into<ParParams>;
}
Expand description
The trait extends Stream types with parallel processing combinators.
Required methods
fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item> where
B: Into<BufSize>,
fn spawned<B>(self, buf_size: B) -> RecvStream<'static, Self::Item> where
B: Into<BufSize>,
Moves the stream to a spawned worker and forwards stream items to a channel with buf_size
.
It returns a receiver stream that buffers the items. The receiver stream is cloneable so that items are sent in anycast manner.
This combinator is similar to shared().
The difference is that spawned()
spawns a worker that actively forwards stream
items to the channel, and the receivers shares the channel. The shared()
combinator
directly poll the underlying stream whenever a receiver polls in lock-free manner.
The choice of these combinator depends on the performance considerations.
use futures::prelude::*;
use par_stream::prelude::*;
// Creates two sharing handles to the stream
let stream = stream::iter(0..100);
let recv1 = stream.spawned(None); // spawn with default buffer size
let recv2 = recv1.clone(); // creates the second receiver
// Consumes the shared streams individually
let collect1 = par_stream::rt::spawn(recv1.collect());
let collect2 = par_stream::rt::spawn(recv2.collect());
let (vec1, vec2): (Vec<_>, Vec<_>) = futures::join!(collect1, collect2);
// Checks that the combined values of two vecs are equal to original values
let mut all_vec: Vec<_> = vec1.into_iter().chain(vec2).collect();
all_vec.sort();
itertools::assert_equal(all_vec, 0..100);
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T> where
B: Into<BufSize>,
T: Send,
F: 'static + Send + FnMut(Self::Item) -> T,
fn map_blocking<B, T, F>(self, buf_size: B, f: F) -> RecvStream<'static, T> where
B: Into<BufSize>,
T: Send,
F: 'static + Send + FnMut(Self::Item) -> T,
Maps this stream’s items to a different type on an blocking thread.
The combinator iteratively maps the stream items and places the output
items to a channel with buf_size
. The function f
is executed on a
separate blocking thread to prevent from blocking the asynchronous runtime.
use futures::{prelude::*, stream};
use par_stream::prelude::*;
let vec: Vec<_> = stream::iter(0..100)
.map_blocking(None, |_| {
// runs a CPU-bounded work here
(0..1000).sum::<u64>()
})
.collect()
.await;
Creates a builder that routes each input item according to key_fn
to a destination receiver.
Call builder.register("key")
to obtain the receiving stream for that key.
The builder must be finished by builder.build()
so that receivers start
consuming items. builder.build()
also returns a special leaking receiver
for items which key is not registered or target receiver is closed. Dropping the builder without
builder.build()
will cause receivers to get empty input.
fn par_builder(self) -> ParBuilder<Self>
fn par_builder(self) -> ParBuilder<Self>
Creates a builder that setups parallel tasks.
The combinator maintains a collection of concurrent workers, each consuming as many elements as it likes, for each output element.
use futures::prelude::*;
use par_stream::prelude::*;
let data = vec![1, 2, -3, 4, 5, -6, 7, 8];
stream::iter(data).par_batching(None, |_worker_index, rx| async move {
while let Ok(value) = rx.recv_async().await {
if value > 0 {
return Some((value, rx));
}
}
None
});
Converts the stream to cloneable receivers, each receiving a copy for each input item.
It spawns a task to consume the stream, and forwards item copies to receivers.
The buf_size
sets the interal channel size. Dropping a receiver does not cause another
receiver to stop.
Receivers are not guaranteed to get the same initial item due to the time difference among receiver creation time. Use broadcast() instead if you need this guarantee.
use futures::{join, prelude::*};
use par_stream::prelude::*;
let orig: Vec<_> = (0..1000).collect();
let rx1 = stream::iter(orig.clone()).tee(1);
let rx2 = rx1.clone();
let rx3 = rx1.clone();
let fut1 = rx1.map(|val| val).collect();
let fut2 = rx2.map(|val| val * 2).collect();
let fut3 = rx3.map(|val| val * 3).collect();
let (vec1, vec2, vec3): (Vec<_>, Vec<_>, Vec<_>) = join!(fut1, fut2, fut3);
Creates a builder to register broadcast receivers.
Call builder.register() to create a receiver. Once the registration is done. builder.build() must be called so that receivers start comsuming item copies. If the builder is droppped without build, receivers get empty input.
Each receiver maintains an internal buffer of buf_size
. The send_all
configures
the behavior if any one of receiver closes. If send_all
is true, closing of one receiver
casues the other receivers to stop, otherwise it does not.
use futures::{join, prelude::*};
use par_stream::prelude::*;
let mut builder = stream::iter(0..).broadcast(2, true);
let rx1 = builder.register();
let rx2 = builder.register();
builder.build();
let (ret1, ret2): (Vec<_>, Vec<_>) = join!(rx1.take(100).collect(), rx2.take(100).collect());
let expect: Vec<_> = (0..100).collect();
assert_eq!(ret1, expect);
assert_eq!(ret2, expect);
Runs an asynchronous task on parallel workers and produces items respecting the input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
Output items are gathered to a channel and are reordered respecting to input order.
use futures::prelude::*;
use par_stream::prelude::*;
let doubled: Vec<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_then(None, move |value| async move { value * 2 })
// the collected values will be ordered
.collect()
.await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
fn par_then_unordered<T, P, F, Fut>(
self,
params: P,
f: F
) -> RecvStream<'static, T> where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>,
fn par_then_unordered<T, P, F, Fut>(
self,
params: P,
f: F
) -> RecvStream<'static, T> where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
P: Into<ParParams>,
Runs an asynchronous task on parallel workers and produces items without respecting input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
The worker forwards the output to a channel as soon as it finishes.
use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;
let doubled: HashSet<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_then_unordered(None, move |value| {
// the future is sent to a parallel worker
async move { value * 2 }
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Runs a blocking task on parallel workers and produces items respecting the input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
Output items are gathered to a channel and are reordered respecting to input order.
use futures::prelude::*;
use par_stream::prelude::*;
// the variable will be shared by parallel workers
let doubled: Vec<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_map(None, move |value| {
// the closure is sent to parallel worker
move || value * 2
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: Vec<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
fn par_map_unordered<T, P, F, Func>(
self,
params: P,
f: F
) -> RecvStream<'static, T> where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>,
fn par_map_unordered<T, P, F, Func>(
self,
params: P,
f: F
) -> RecvStream<'static, T> where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
P: Into<ParParams>,
Runs a blocking task on parallel workers and produces items without respecting input order.
The params
sets the worker pool size and output buffer size.
Each parallel worker shares the stream and executes a future for each input item.
The worker forwards the output to a channel as soon as it finishes.
use futures::prelude::*;
use par_stream::prelude::*;
use std::collections::HashSet;
// the variable will be shared by parallel workers
let doubled: HashSet<_> = stream::iter(0..1000)
// doubles the values in parallel
.par_map_unordered(None, move |value| {
// the closure is sent to parallel worker
move || value * 2
})
// the collected values may NOT be ordered
.collect()
.await;
let expect: HashSet<_> = (0..1000).map(|value| value * 2).collect();
assert_eq!(doubled, expect);
Reduces the input stream into a single value in parallel.
It maintains a parallel worker pool of num_workers
. Each worker reduces
the input items from the stream into a single value. Once all parallel worker
finish, the values from each worker are reduced into one in treefold manner.
use futures::prelude::*;
use par_stream::prelude::*;
// the variable will be shared by parallel workers
let sum = stream::iter(1..=1000)
// sum up the values in parallel
.par_reduce(None, move |lhs, rhs| {
// the closure is sent to parallel worker
async move { lhs + rhs }
})
.await;
assert_eq!(sum, Some((1 + 1000) * 1000 / 2));
Runs an asynchronous task on parallel workers.