use crate::{base, common::*, config::IntoParStreamParams, impls};
pub fn par_gather<S>(
streams: impl IntoIterator<Item = S>,
buf_size: impl Into<Option<usize>>,
) -> ParGather<S::Item>
where
S: 'static + StreamExt + Unpin + Send,
S::Item: Send,
{
impls::stream::par_gather(streams, buf_size)
}
pub trait ParStreamExt {
fn par_then<T, F, Fut>(self, config: impl IntoParStreamParams, f: F) -> ParMap<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_then(self, config, f)
}
fn par_then_init<T, B, InitF, MapF, Fut>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
f: MapF,
) -> ParMap<T>
where
T: 'static + Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_then_init(self, config, init_f, f)
}
fn par_then_unordered<T, F, Fut>(
self,
config: impl IntoParStreamParams,
f: F,
) -> ParMapUnordered<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_then_unordered(self, config, f)
}
fn par_then_init_unordered<T, B, InitF, MapF, Fut>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
map_f: MapF,
) -> ParMapUnordered<T>
where
T: 'static + Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_then_init_unordered(self, config, init_f, map_f)
}
fn par_map<T, F, Func>(self, config: impl IntoParStreamParams, f: F) -> ParMap<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_map(self, config, f)
}
fn par_map_init<T, B, InitF, MapF, Func>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
f: MapF,
) -> ParMap<T>
where
T: 'static + Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_map_init(self, config, init_f, f)
}
fn par_map_unordered<T, F, Func>(
self,
config: impl IntoParStreamParams,
f: F,
) -> ParMapUnordered<T>
where
T: 'static + Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_map_unordered(self, config, f)
}
fn par_map_init_unordered<T, B, InitF, MapF, Func>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
f: MapF,
) -> ParMapUnordered<T>
where
T: 'static + Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> T + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_map_init_unordered(self, config, init_f, f)
}
fn par_reduce<F, Fut>(
self,
limit: impl Into<Option<usize>>,
buf_size: impl Into<Option<usize>>,
f: F,
) -> ParReduce<Self::Item>
where
F: 'static + FnMut(Self::Item, Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = Self::Item> + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_reduce(self, limit, buf_size, f)
}
fn par_routing<F1, F2, Fut, T>(
self,
buf_size: impl Into<Option<usize>>,
routing_fn: F1,
map_fns: Vec<F2>,
) -> ParRouting<T>
where
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
F1: 'static + FnMut(&Self::Item) -> usize + Send,
F2: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
T: 'static + Send,
{
impls::stream::ParStreamExt::par_routing(self, buf_size, routing_fn, map_fns)
}
fn par_routing_unordered<F1, F2, Fut, T>(
self,
buf_size: impl Into<Option<usize>>,
routing_fn: F1,
map_fns: Vec<F2>,
) -> ParRoutingUnordered<T>
where
F1: 'static + FnMut(&Self::Item) -> usize + Send,
F2: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = T> + Send,
T: 'static + Send,
Self: 'static + StreamExt + Sized + Unpin + Send,
Self::Item: Send,
{
impls::stream::ParStreamExt::par_routing_unordered(self, buf_size, routing_fn, map_fns)
}
fn wrapping_enumerate<T>(self) -> WrappingEnumerate<T, Self>
where
Self: Stream<Item = T> + Sized + Unpin,
{
base::ParStreamExt::wrapping_enumerate(self)
}
fn reorder_enumerated<T>(self) -> ReorderEnumerated<T, Self>
where
Self: Stream<Item = (usize, T)> + Unpin + Sized,
{
base::ParStreamExt::reorder_enumerated(self)
}
fn par_scatter(
self,
buf_size: impl Into<Option<usize>>,
) -> (
Pin<Box<dyn Future<Output = ()>>>,
async_std::channel::Receiver<Self::Item>,
)
where
Self: 'static + StreamExt + Sized + Unpin,
{
impls::stream::ParStreamExt::par_scatter(self, buf_size)
}
fn par_for_each<F, Fut>(self, config: impl IntoParStreamParams, f: F) -> ParForEach
where
Self: 'static + Stream + Unpin + Sized + Send,
Self::Item: Send,
F: 'static + FnMut(Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = ()> + Send,
{
impls::stream::ParStreamExt::par_for_each(self, config, f)
}
fn par_for_each_init<B, InitF, MapF, Fut>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
map_f: MapF,
) -> ParForEach
where
Self: 'static + Stream + Unpin + Sized + Send,
Self::Item: Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Fut + Send,
Fut: 'static + Future<Output = ()> + Send,
{
impls::stream::ParStreamExt::par_for_each_init(self, config, init_f, map_f)
}
fn par_for_each_blocking<F, Func>(self, config: impl IntoParStreamParams, f: F) -> ParForEach
where
Self: 'static + Stream + Unpin + Sized + Send,
Self::Item: Send,
F: 'static + FnMut(Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> () + Send,
{
impls::stream::ParStreamExt::par_for_each_blocking(self, config, f)
}
fn par_for_each_blocking_init<B, InitF, MapF, Func>(
self,
config: impl IntoParStreamParams,
init_f: InitF,
f: MapF,
) -> ParForEach
where
Self: 'static + Stream + Unpin + Sized + Send,
Self::Item: Send,
B: 'static + Send + Clone,
InitF: FnMut() -> B,
MapF: 'static + FnMut(B, Self::Item) -> Func + Send,
Func: 'static + FnOnce() -> () + Send,
{
impls::stream::ParStreamExt::par_for_each_blocking_init(self, config, init_f, f)
}
}
impl<S> ParStreamExt for S where S: Stream {}
pub use impls::stream::ParMap;
pub use impls::stream::ParMapUnordered;
pub use impls::stream::ParReduce;
pub use impls::stream::ParRouting;
pub use impls::stream::ParRoutingUnordered;
pub use impls::stream::ParGather;
pub use base::WrappingEnumerate;
pub use base::ReorderEnumerated;
pub use impls::stream::ParForEach;