1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
mod all;
mod any;
mod collect;
mod combine;
mod count;
mod fold;
mod for_each;
mod max;
mod sample;
mod sum;
mod tuple;

use futures::Stream;
use std::{
	pin::Pin, task::{Context, Poll}
};

use crate::pool::ProcessSend;

use super::dist_pipe::*;

pub use self::{
	all::*, any::*, collect::*, combine::*, count::*, fold::*, for_each::*, max::*, sample::*, sum::*, tuple::*
};

#[must_use]
pub trait Reducer {
	type Item;
	type Output;
	type Async: ReducerAsync<Item = Self::Item, Output = Self::Output>;

	fn into_async(self) -> Self::Async;
}
#[must_use]
pub trait ReducerAsync {
	type Item;
	type Output;

	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()>;
	fn poll_output(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
pub trait ReducerSend: Reducer<Output = <Self as ReducerSend>::Output> {
	type Output: Send + 'static;
}
pub trait ReducerProcessSend: ReducerSend<Output = <Self as ReducerProcessSend>::Output> {
	type Output: ProcessSend;
}

pub trait Factory {
	type Item;

	fn make(&self) -> Self::Item;
}

#[must_use]
pub trait DistributedSink<I: DistributedPipe<Source>, Source, B> {
	type ReduceAFactory: Factory<Item = Self::ReduceA> + Clone + ProcessSend;
	type ReduceBFactory: Factory<Item = Self::ReduceB>;
	type ReduceA: ReducerSend<Item = <I as DistributedPipe<Source>>::Item> + ProcessSend;
	type ReduceB: ReducerProcessSend<Item = <Self::ReduceA as Reducer>::Output> + ProcessSend;
	type ReduceC: Reducer<Item = <Self::ReduceB as Reducer>::Output, Output = B>;

	fn reducers(self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC);
}

#[inline(always)]
pub(crate) fn assert_distributed_sink<
	T,
	R: DistributedSink<I, Source, T>,
	I: DistributedPipe<Source>,
	Source,
>(
	r: R,
) -> R {
	r
}