1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
mod all;
mod any;
mod collect;
mod combine;
mod combiner;
mod count;
mod fold;
mod folder;
mod for_each;
mod group_by;
mod histogram;
mod max;
mod pipe;
mod sample;
mod sum;
mod tuple;

use futures::Stream;
use std::{
	ops::DerefMut, pin::Pin, task::{Context, Poll}
};

use crate::pool::ProcessSend;

use super::par_pipe::*;

pub use self::{
	all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, group_by::*, histogram::*, max::*, pipe::*, sample::*, sum::*, tuple::*
};

#[must_use]
pub trait Reducer {
	type Item;
	type Output;
	type Async: ReducerAsync<Item = Self::Item, Output = Self::Output>;

	fn into_async(self) -> Self::Async;
}
#[must_use]
pub trait ReducerAsync {
	type Item;
	type Output;

	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()>;
	fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub trait ReducerSend: Reducer<Output = <Self as ReducerSend>::Output> {
	type Output: Send + 'static;
}
pub trait ReducerProcessSend: ReducerSend<Output = <Self as ReducerProcessSend>::Output> {
	type Output: ProcessSend + 'static;
}

impl<P> ReducerAsync for Pin<P>
where
	P: DerefMut + Unpin,
	P::Target: ReducerAsync,
{
	type Item = <P::Target as ReducerAsync>::Item;
	type Output = <P::Target as ReducerAsync>::Output;

	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()> {
		self.get_mut().as_mut().poll_forward(cx, stream)
	}
	fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		self.get_mut().as_mut().poll_output(cx)
	}
}
impl<T: ?Sized> ReducerAsync for &mut T
where
	T: ReducerAsync + Unpin,
{
	type Item = T::Item;
	type Output = T::Output;

	fn poll_forward(
		mut self: Pin<&mut Self>, cx: &mut Context,
		stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()> {
		Pin::new(&mut **self).poll_forward(cx, stream)
	}
	fn poll_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		Pin::new(&mut **self).poll_output(cx)
	}
}

pub trait Factory {
	type Item;

	fn make(&self) -> Self::Item;
}

#[must_use]
pub trait DistributedSink<Source> {
	type Output;
	type Pipe: DistributedPipe<Source>;
	type ReduceAFactory: Factory<Item = Self::ReduceA> + Clone + ProcessSend;
	type ReduceBFactory: Factory<Item = Self::ReduceB>;
	type ReduceA: ReducerSend<Item = <Self::Pipe as DistributedPipe<Source>>::Item> + Send;
	type ReduceB: ReducerProcessSend<Item = <Self::ReduceA as Reducer>::Output> + ProcessSend;
	type ReduceC: Reducer<Item = <Self::ReduceB as Reducer>::Output, Output = Self::Output>;

	fn reducers(
		self,
	) -> (
		Self::Pipe,
		Self::ReduceAFactory,
		Self::ReduceBFactory,
		Self::ReduceC,
	);
}

#[inline(always)]
pub(crate) fn assert_distributed_sink<R: DistributedSink<Source>, Source>(r: R) -> R {
	r
}

#[must_use]
pub trait ParallelSink<Source> {
	type Output;
	type Pipe: ParallelPipe<Source>;
	type ReduceAFactory: Factory<Item = Self::ReduceA>;
	type ReduceA: ReducerSend<Item = <Self::Pipe as ParallelPipe<Source>>::Item> + Send;
	type ReduceC: Reducer<Item = <Self::ReduceA as Reducer>::Output, Output = Self::Output>;

	fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC);
}

#[inline(always)]
pub(crate) fn assert_parallel_sink<R: ParallelSink<Source>, Source>(r: R) -> R {
	r
}