amadeus_core/par_sink/
sample.rs

1#![allow(clippy::type_complexity)]
2
3use amadeus_streaming::{
4	HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Sort as SASort, Top
5};
6use derive_new::new;
7use rand::thread_rng;
8use serde::{Deserialize, Serialize};
9use serde_closure::traits;
10use std::{cmp::Ordering, hash::Hash};
11
12use super::{
13	folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder, SumZeroFolder
14};
15
16#[derive(new)]
17#[must_use]
18pub struct SampleUnstable<P> {
19	pipe: P,
20	samples: usize,
21}
22
23impl_par_dist! {
24	impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for SampleUnstable<P>
25	where
26		P::Output: Send + 'static,
27	{
28		folder_par_sink!(
29			SampleUnstableFolder,
30			SumFolder<SASampleUnstable<P::Output>>,
31			self,
32			SampleUnstableFolder::new(self.samples),
33			SumFolder::new()
34		);
35	}
36}
37
38#[derive(Clone, Serialize, Deserialize, new)]
39pub struct SampleUnstableFolder {
40	samples: usize,
41}
42
43impl<Item> FolderSync<Item> for SampleUnstableFolder {
44	type State = SASampleUnstable<Item>;
45	type Done = Self::State;
46
47	fn zero(&mut self) -> Self::State {
48		SASampleUnstable::new(self.samples)
49	}
50	fn push(&mut self, state: &mut Self::State, item: Item) {
51		state.push(item, &mut thread_rng())
52	}
53	fn done(&mut self, state: Self::State) -> Self::Done {
54		state
55	}
56}
57
58#[derive(new)]
59#[must_use]
60pub struct Sort<P, F> {
61	pipe: P,
62	f: F,
63	n: usize,
64}
65
66impl_par_dist! {
67	#[cfg_attr(not(nightly), serde_closure::desugar)]
68	impl<P: ParallelPipe<Item>, F, Item> ParallelSink<Item> for Sort<P, F>
69	where
70		F: traits::Fn(&P::Output, &P::Output) -> Ordering + Clone + Send + 'static,
71		P::Output: Clone + Send + 'static,
72	{
73		folder_par_sink!(
74			SortFolder<F>,
75			SumZeroFolder<SASort<P::Output, F>>,
76			self,
77			SortFolder::new(self.f.clone(), self.n),
78			SumZeroFolder::new(SASort::new(self.f, self.n))
79		);
80	}
81}
82
83#[derive(Clone, Serialize, Deserialize, new)]
84pub struct SortFolder<F> {
85	f: F,
86	n: usize,
87}
88
89#[cfg_attr(not(nightly), serde_closure::desugar)]
90impl<Item, F> FolderSync<Item> for SortFolder<F>
91where
92	F: traits::Fn(&Item, &Item) -> Ordering + Clone,
93{
94	type State = SASort<Item, F>;
95	type Done = Self::State;
96
97	fn zero(&mut self) -> Self::Done {
98		SASort::new(self.f.clone(), self.n)
99	}
100	fn push(&mut self, state: &mut Self::Done, item: Item) {
101		state.push(item)
102	}
103	fn done(&mut self, state: Self::State) -> Self::Done {
104		state
105	}
106}
107
108#[derive(new)]
109#[must_use]
110pub struct MostFrequent<P> {
111	pipe: P,
112	n: usize,
113	probability: f64,
114	tolerance: f64,
115}
116
117impl_par_dist! {
118	impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for MostFrequent<P>
119	where
120		P::Output: Clone + Hash + Eq + Send + 'static,
121	{
122		folder_par_sink!(
123			MostFrequentFolder,
124			SumZeroFolder<Top<P::Output, usize>>,
125			self,
126			MostFrequentFolder::new(self.n, self.probability, self.tolerance),
127			SumZeroFolder::new(Top::new(self.n, self.probability, self.tolerance, ()))
128		);
129	}
130}
131
132#[derive(Clone, Serialize, Deserialize, new)]
133pub struct MostFrequentFolder {
134	n: usize,
135	probability: f64,
136	tolerance: f64,
137}
138
139impl<Item> FolderSync<Item> for MostFrequentFolder
140where
141	Item: Clone + Hash + Eq + Send + 'static,
142{
143	type State = Top<Item, usize>;
144	type Done = Self::State;
145
146	fn zero(&mut self) -> Self::State {
147		Top::new(self.n, self.probability, self.tolerance, ())
148	}
149	fn push(&mut self, state: &mut Self::State, item: Item) {
150		state.push(item, &1)
151	}
152	fn done(&mut self, state: Self::State) -> Self::Done {
153		state
154	}
155}
156
157#[derive(new)]
158#[must_use]
159pub struct MostDistinct<P> {
160	pipe: P,
161	n: usize,
162	probability: f64,
163	tolerance: f64,
164	error_rate: f64,
165}
166
167impl_par_dist! {
168	impl<P: ParallelPipe<Item, Output = (A, B)>, Item, A, B> ParallelSink<Item> for MostDistinct<P>
169	where
170		A: Clone + Hash + Eq + Send + 'static,
171		B: Hash + 'static,
172	{
173		folder_par_sink!(
174			MostDistinctFolder,
175			SumZeroFolder<Top<A, HyperLogLogMagnitude<B>>>,
176			self,
177			MostDistinctFolder::new(self.n, self.probability, self.tolerance, self.error_rate),
178			SumZeroFolder::new(Top::new(
179				self.n,
180				self.probability,
181				self.tolerance,
182				self.error_rate
183			))
184		);
185	}
186}
187
188#[derive(Clone, Serialize, Deserialize, new)]
189pub struct MostDistinctFolder {
190	n: usize,
191	probability: f64,
192	tolerance: f64,
193	error_rate: f64,
194}
195
196impl<A, B> FolderSync<(A, B)> for MostDistinctFolder
197where
198	A: Clone + Hash + Eq + Send + 'static,
199	B: Hash + 'static,
200{
201	type State = Top<A, HyperLogLogMagnitude<B>>;
202	type Done = Self::State;
203
204	fn zero(&mut self) -> Self::State {
205		Top::new(self.n, self.probability, self.tolerance, self.error_rate)
206	}
207	fn push(&mut self, state: &mut Self::State, item: (A, B)) {
208		state.push(item.0, &item.1)
209	}
210	fn done(&mut self, state: Self::State) -> Self::Done {
211		state
212	}
213}