1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
mod all; mod any; mod collect; mod combine; mod combiner; mod count; mod fold; mod folder; mod for_each; mod group_by; mod histogram; mod max; mod pipe; mod sample; mod sum; mod tuple; use futures::Stream; use std::{ ops::DerefMut, pin::Pin, task::{Context, Poll} }; use crate::pool::ProcessSend; use super::par_pipe::*; pub use self::{ all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, group_by::*, histogram::*, max::*, pipe::*, sample::*, sum::*, tuple::* }; #[must_use] pub trait Reducer { type Item; type Output; type Async: ReducerAsync<Item = Self::Item, Output = Self::Output>; fn into_async(self) -> Self::Async; } #[must_use] pub trait ReducerAsync { type Item; type Output; fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>, ) -> Poll<()>; fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } pub trait ReducerSend: Reducer<Output = <Self as ReducerSend>::Output> { type Output: Send + 'static; } pub trait ReducerProcessSend: ReducerSend<Output = <Self as ReducerProcessSend>::Output> { type Output: ProcessSend + 'static; } impl<P> ReducerAsync for Pin<P> where P: DerefMut + Unpin, P::Target: ReducerAsync, { type Item = <P::Target as ReducerAsync>::Item; type Output = <P::Target as ReducerAsync>::Output; fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>, ) -> Poll<()> { self.get_mut().as_mut().poll_forward(cx, stream) } fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { self.get_mut().as_mut().poll_output(cx) } } impl<T: ?Sized> ReducerAsync for &mut T where T: ReducerAsync + Unpin, { type Item = T::Item; type Output = T::Output; fn poll_forward( mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>, ) -> Poll<()> { Pin::new(&mut **self).poll_forward(cx, stream) } fn poll_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { Pin::new(&mut **self).poll_output(cx) } } pub trait Factory { type Item; fn make(&self) -> Self::Item; } #[must_use] pub trait DistributedSink<Source> { type Output; type Pipe: DistributedPipe<Source>; type ReduceAFactory: Factory<Item = Self::ReduceA> + Clone + ProcessSend; type ReduceBFactory: Factory<Item = Self::ReduceB>; type ReduceA: ReducerSend<Item = <Self::Pipe as DistributedPipe<Source>>::Item> + Send; type ReduceB: ReducerProcessSend<Item = <Self::ReduceA as Reducer>::Output> + ProcessSend; type ReduceC: Reducer<Item = <Self::ReduceB as Reducer>::Output, Output = Self::Output>; fn reducers( self, ) -> ( Self::Pipe, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC, ); } #[inline(always)] pub(crate) fn assert_distributed_sink<R: DistributedSink<Source>, Source>(r: R) -> R { r } #[must_use] pub trait ParallelSink<Source> { type Output; type Pipe: ParallelPipe<Source>; type ReduceAFactory: Factory<Item = Self::ReduceA>; type ReduceA: ReducerSend<Item = <Self::Pipe as ParallelPipe<Source>>::Item> + Send; type ReduceC: Reducer<Item = <Self::ReduceA as Reducer>::Output, Output = Self::Output>; fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC); } #[inline(always)] pub(crate) fn assert_parallel_sink<R: ParallelSink<Source>, Source>(r: R) -> R { r }