1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
mod all; mod any; mod collect; mod combine; mod combiner; mod count; mod fold; mod folder; mod for_each; mod group_by; mod max; mod sample; mod sum; mod tuple; use futures::Stream; use std::{ pin::Pin, task::{Context, Poll} }; use crate::pool::ProcessSend; use super::par_pipe::*; pub use self::{ all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, group_by::*, max::*, sample::*, sum::*, tuple::* }; #[must_use] pub trait Reducer { type Item; type Output; type Async: ReducerAsync<Item = Self::Item, Output = Self::Output>; fn into_async(self) -> Self::Async; } #[must_use] pub trait ReducerAsync { type Item; type Output; fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>, ) -> Poll<()>; fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } pub trait ReducerSend: Reducer<Output = <Self as ReducerSend>::Output> { type Output: Send + 'static; } pub trait ReducerProcessSend: ReducerSend<Output = <Self as ReducerProcessSend>::Output> { type Output: ProcessSend + 'static; } pub trait Factory { type Item; fn make(&self) -> Self::Item; } #[must_use] pub trait DistributedSink<Source> { type Output; type Pipe: DistributedPipe<Source>; type ReduceAFactory: Factory<Item = Self::ReduceA> + Clone + ProcessSend; type ReduceBFactory: Factory<Item = Self::ReduceB>; type ReduceA: ReducerSend<Item = <Self::Pipe as DistributedPipe<Source>>::Item> + Send; type ReduceB: ReducerProcessSend<Item = <Self::ReduceA as Reducer>::Output> + ProcessSend; type ReduceC: Reducer<Item = <Self::ReduceB as Reducer>::Output, Output = Self::Output>; fn reducers( self, ) -> ( Self::Pipe, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC, ); } #[inline(always)] pub(crate) fn assert_distributed_sink<R: DistributedSink<Source>, Source>(r: R) -> R { r } #[must_use] pub trait ParallelSink<Source> { type Output; type Pipe: ParallelPipe<Source>; type ReduceAFactory: Factory<Item = Self::ReduceA>; type ReduceA: ReducerSend<Item = <Self::Pipe as ParallelPipe<Source>>::Item> + Send; type ReduceC: Reducer<Item = <Self::ReduceA as Reducer>::Output, Output = Self::Output>; fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC); } #[inline(always)] pub(crate) fn assert_parallel_sink<R: ParallelSink<Source>, Source>(r: R) -> R { r }