amadeus_core/
par_sink.rs

1mod all;
2mod any;
3mod collect;
4mod combine;
5mod combiner;
6mod count;
7mod fold;
8mod folder;
9mod for_each;
10mod fork;
11mod group_by;
12mod histogram;
13mod max;
14mod mean;
15mod pipe;
16mod sample;
17mod stddev;
18mod sum;
19mod tuple;
20
21use super::par_pipe::*;
22use crate::{pipe::Sink, pool::ProcessSend};
23
24pub use self::{
25	all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, fork::*, group_by::*, histogram::*, max::*, mean::*, pipe::*, sample::*, stddev::*, sum::*, tuple::*
26};
27
28#[must_use]
29pub trait Reducer<Item> {
30	type Done;
31	type Async: Sink<Item, Done = Self::Done>;
32
33	fn into_async(self) -> Self::Async;
34}
35pub trait ReducerSend<Item>: Reducer<Item, Done = <Self as ReducerSend<Item>>::Done> {
36	type Done: Send + 'static;
37}
38pub trait ReducerProcessSend<Item>:
39	ReducerSend<Item, Done = <Self as ReducerProcessSend<Item>>::Done>
40{
41	type Done: ProcessSend + 'static;
42}
43
44#[must_use]
45pub trait ParallelSink<Item> {
46	type Done;
47	type Pipe: ParallelPipe<Item>;
48	type ReduceA: ReducerSend<<Self::Pipe as ParallelPipe<Item>>::Output> + Clone + Send;
49	type ReduceC: Reducer<
50		<Self::ReduceA as ReducerSend<<Self::Pipe as ParallelPipe<Item>>::Output>>::Done,
51		Done = Self::Done,
52	>;
53
54	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC);
55}
56
57#[inline(always)]
58pub(crate) fn assert_parallel_sink<R: ParallelSink<Item>, Item>(r: R) -> R {
59	r
60}
61
62#[must_use]
63pub trait DistributedSink<Item> {
64	type Done;
65	type Pipe: DistributedPipe<Item>;
66	type ReduceA: ReducerSend<<Self::Pipe as DistributedPipe<Item>>::Output>
67		+ Clone
68		+ ProcessSend
69		+ Send;
70	type ReduceB: ReducerProcessSend<
71			<Self::ReduceA as ReducerSend<<Self::Pipe as DistributedPipe<Item>>::Output>>::Done,
72		> + Clone
73		+ ProcessSend;
74	type ReduceC: Reducer<
75		<Self::ReduceB as ReducerProcessSend<
76			<Self::ReduceA as ReducerSend<<Self::Pipe as DistributedPipe<Item>>::Output>>::Done,
77		>>::Done,
78		Done = Self::Done,
79	>;
80
81	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC);
82}
83
84#[inline(always)]
85pub(crate) fn assert_distributed_sink<R: DistributedSink<Item>, Item>(r: R) -> R {
86	r
87}