1#![allow(clippy::type_complexity)]
2
3use amadeus_streaming::{
4 HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Sort as SASort, Top
5};
6use derive_new::new;
7use rand::thread_rng;
8use serde::{Deserialize, Serialize};
9use serde_closure::traits;
10use std::{cmp::Ordering, hash::Hash};
11
12use super::{
13 folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder, SumZeroFolder
14};
15
16#[derive(new)]
17#[must_use]
18pub struct SampleUnstable<P> {
19 pipe: P,
20 samples: usize,
21}
22
23impl_par_dist! {
24 impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for SampleUnstable<P>
25 where
26 P::Output: Send + 'static,
27 {
28 folder_par_sink!(
29 SampleUnstableFolder,
30 SumFolder<SASampleUnstable<P::Output>>,
31 self,
32 SampleUnstableFolder::new(self.samples),
33 SumFolder::new()
34 );
35 }
36}
37
38#[derive(Clone, Serialize, Deserialize, new)]
39pub struct SampleUnstableFolder {
40 samples: usize,
41}
42
43impl<Item> FolderSync<Item> for SampleUnstableFolder {
44 type State = SASampleUnstable<Item>;
45 type Done = Self::State;
46
47 fn zero(&mut self) -> Self::State {
48 SASampleUnstable::new(self.samples)
49 }
50 fn push(&mut self, state: &mut Self::State, item: Item) {
51 state.push(item, &mut thread_rng())
52 }
53 fn done(&mut self, state: Self::State) -> Self::Done {
54 state
55 }
56}
57
58#[derive(new)]
59#[must_use]
60pub struct Sort<P, F> {
61 pipe: P,
62 f: F,
63 n: usize,
64}
65
66impl_par_dist! {
67 #[cfg_attr(not(nightly), serde_closure::desugar)]
68 impl<P: ParallelPipe<Item>, F, Item> ParallelSink<Item> for Sort<P, F>
69 where
70 F: traits::Fn(&P::Output, &P::Output) -> Ordering + Clone + Send + 'static,
71 P::Output: Clone + Send + 'static,
72 {
73 folder_par_sink!(
74 SortFolder<F>,
75 SumZeroFolder<SASort<P::Output, F>>,
76 self,
77 SortFolder::new(self.f.clone(), self.n),
78 SumZeroFolder::new(SASort::new(self.f, self.n))
79 );
80 }
81}
82
83#[derive(Clone, Serialize, Deserialize, new)]
84pub struct SortFolder<F> {
85 f: F,
86 n: usize,
87}
88
89#[cfg_attr(not(nightly), serde_closure::desugar)]
90impl<Item, F> FolderSync<Item> for SortFolder<F>
91where
92 F: traits::Fn(&Item, &Item) -> Ordering + Clone,
93{
94 type State = SASort<Item, F>;
95 type Done = Self::State;
96
97 fn zero(&mut self) -> Self::Done {
98 SASort::new(self.f.clone(), self.n)
99 }
100 fn push(&mut self, state: &mut Self::Done, item: Item) {
101 state.push(item)
102 }
103 fn done(&mut self, state: Self::State) -> Self::Done {
104 state
105 }
106}
107
108#[derive(new)]
109#[must_use]
110pub struct MostFrequent<P> {
111 pipe: P,
112 n: usize,
113 probability: f64,
114 tolerance: f64,
115}
116
117impl_par_dist! {
118 impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for MostFrequent<P>
119 where
120 P::Output: Clone + Hash + Eq + Send + 'static,
121 {
122 folder_par_sink!(
123 MostFrequentFolder,
124 SumZeroFolder<Top<P::Output, usize>>,
125 self,
126 MostFrequentFolder::new(self.n, self.probability, self.tolerance),
127 SumZeroFolder::new(Top::new(self.n, self.probability, self.tolerance, ()))
128 );
129 }
130}
131
132#[derive(Clone, Serialize, Deserialize, new)]
133pub struct MostFrequentFolder {
134 n: usize,
135 probability: f64,
136 tolerance: f64,
137}
138
139impl<Item> FolderSync<Item> for MostFrequentFolder
140where
141 Item: Clone + Hash + Eq + Send + 'static,
142{
143 type State = Top<Item, usize>;
144 type Done = Self::State;
145
146 fn zero(&mut self) -> Self::State {
147 Top::new(self.n, self.probability, self.tolerance, ())
148 }
149 fn push(&mut self, state: &mut Self::State, item: Item) {
150 state.push(item, &1)
151 }
152 fn done(&mut self, state: Self::State) -> Self::Done {
153 state
154 }
155}
156
157#[derive(new)]
158#[must_use]
159pub struct MostDistinct<P> {
160 pipe: P,
161 n: usize,
162 probability: f64,
163 tolerance: f64,
164 error_rate: f64,
165}
166
167impl_par_dist! {
168 impl<P: ParallelPipe<Item, Output = (A, B)>, Item, A, B> ParallelSink<Item> for MostDistinct<P>
169 where
170 A: Clone + Hash + Eq + Send + 'static,
171 B: Hash + 'static,
172 {
173 folder_par_sink!(
174 MostDistinctFolder,
175 SumZeroFolder<Top<A, HyperLogLogMagnitude<B>>>,
176 self,
177 MostDistinctFolder::new(self.n, self.probability, self.tolerance, self.error_rate),
178 SumZeroFolder::new(Top::new(
179 self.n,
180 self.probability,
181 self.tolerance,
182 self.error_rate
183 ))
184 );
185 }
186}
187
188#[derive(Clone, Serialize, Deserialize, new)]
189pub struct MostDistinctFolder {
190 n: usize,
191 probability: f64,
192 tolerance: f64,
193 error_rate: f64,
194}
195
196impl<A, B> FolderSync<(A, B)> for MostDistinctFolder
197where
198 A: Clone + Hash + Eq + Send + 'static,
199 B: Hash + 'static,
200{
201 type State = Top<A, HyperLogLogMagnitude<B>>;
202 type Done = Self::State;
203
204 fn zero(&mut self) -> Self::State {
205 Top::new(self.n, self.probability, self.tolerance, self.error_rate)
206 }
207 fn push(&mut self, state: &mut Self::State, item: (A, B)) {
208 state.push(item.0, &item.1)
209 }
210 fn done(&mut self, state: Self::State) -> Self::Done {
211 state
212 }
213}