1use derive_new::new;
2use educe::Educe;
3use serde::{Deserialize, Serialize};
4use serde_closure::traits::FnMut;
5use std::{cmp::Ordering, marker::PhantomData};
6
7use super::{combiner_par_sink, CombinerSync, FolderSyncReducer, ParallelPipe, ParallelSink};
8
9#[derive(new)]
10#[must_use]
11pub struct Max<P> {
12 pipe: P,
13}
14impl_par_dist! {
15 impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for Max<P>
16 where
17 P::Output: Ord + Send + 'static,
18 {
19 combiner_par_sink!(combine::Max<P::Output>, self, combine::Max::new());
20 }
21}
22
23#[derive(new)]
24#[must_use]
25pub struct MaxBy<P, F> {
26 pipe: P,
27 f: F,
28}
29impl_par_dist! {
30 impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for MaxBy<P, F>
31 where
32 F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering>
33 + Clone
34 + Send
35 + 'static,
36 P::Output: Send + 'static,
37 {
38 combiner_par_sink!(combine::MaxBy<P::Output,F>, self, combine::MaxBy::new(self.f));
39 }
40}
41
42#[derive(new)]
43#[must_use]
44pub struct MaxByKey<P, F> {
45 pipe: P,
46 f: F,
47}
48impl_par_dist! {
49 impl<P: ParallelPipe<Item>, Item, F, B> ParallelSink<Item> for MaxByKey<P, F>
50 where
51 F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static,
52 B: Ord + 'static,
53 P::Output: Send + 'static,
54 {
55 combiner_par_sink!(combine::MaxByKey<P::Output,F, B>, self, combine::MaxByKey::new(self.f));
56 }
57}
58
59#[derive(new)]
60#[must_use]
61pub struct Min<P> {
62 pipe: P,
63}
64impl_par_dist! {
65 impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for Min<P>
66 where
67 P::Output: Ord + Send + 'static,
68 {
69 combiner_par_sink!(combine::Min<P::Output>, self, combine::Min::new());
70 }
71}
72
73#[derive(new)]
74#[must_use]
75pub struct MinBy<P, F> {
76 pipe: P,
77 f: F,
78}
79impl_par_dist! {
80 impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for MinBy<P, F>
81 where
82 F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering>
83 + Clone
84 + Send
85 + 'static,
86 P::Output: Send + 'static,
87 {
88 combiner_par_sink!(combine::MinBy<P::Output,F>, self, combine::MinBy::new(self.f));
89 }
90}
91
92#[derive(new)]
93#[must_use]
94pub struct MinByKey<P, F> {
95 pipe: P,
96 f: F,
97}
98impl_par_dist! {
99 impl<P: ParallelPipe<Item>, Item, F, B> ParallelSink<Item> for MinByKey<P, F>
100 where
101 F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static,
102 B: Ord + 'static,
103 P::Output: Send + 'static,
104 {
105 combiner_par_sink!(combine::MinByKey<P::Output,F, B>, self, combine::MinByKey::new(self.f));
106 }
107}
108
109mod combine {
110 use super::*;
111
112 #[derive(Educe, Serialize, Deserialize, new)]
113 #[educe(Clone)]
114 #[serde(bound = "")]
115 pub struct Max<A>(PhantomData<fn() -> A>);
116 impl<A: Ord> CombinerSync for Max<A> {
117 type Done = A;
118
119 fn combine(&mut self, a: A, b: A) -> A {
120 if a.cmp(&b) != Ordering::Greater {
122 b
123 } else {
124 a
125 }
126 }
127 }
128
129 #[derive(Educe, Serialize, Deserialize, new)]
130 #[educe(Clone(bound = "F: Clone"))]
131 #[serde(
132 bound(serialize = "F: Serialize"),
133 bound(deserialize = "F: Deserialize<'de>")
134 )]
135 pub struct MaxBy<A, F>(pub F, PhantomData<fn() -> A>);
136 impl<A, F: for<'a, 'b> FnMut<(&'a A, &'b A), Output = Ordering>> CombinerSync for MaxBy<A, F> {
137 type Done = A;
138
139 fn combine(&mut self, a: A, b: A) -> A {
140 if self.0.call_mut((&a, &b)) != Ordering::Greater {
141 b
142 } else {
143 a
144 }
145 }
146 }
147
148 #[derive(Educe, Serialize, Deserialize, new)]
149 #[educe(Clone(bound = "F: Clone"))]
150 #[serde(
151 bound(serialize = "F: Serialize"),
152 bound(deserialize = "F: Deserialize<'de>")
153 )]
154 pub struct MaxByKey<A, F, B>(pub F, pub PhantomData<fn(A, B)>);
155 impl<A, F: for<'a> FnMut<(&'a A,), Output = B>, B: Ord> CombinerSync for MaxByKey<A, F, B> {
156 type Done = A;
157
158 fn combine(&mut self, a: A, b: A) -> A {
159 if self.0.call_mut((&a,)).cmp(&self.0.call_mut((&b,))) != Ordering::Greater {
160 b
161 } else {
162 a
163 }
164 }
165 }
166
167 #[derive(Educe, Serialize, Deserialize, new)]
168 #[educe(Clone)]
169 #[serde(bound = "")]
170 pub struct Min<A>(PhantomData<fn() -> A>);
171 impl<A: Ord> CombinerSync for Min<A> {
172 type Done = A;
173
174 fn combine(&mut self, a: A, b: A) -> A {
175 if a.cmp(&b) == Ordering::Greater {
177 b
178 } else {
179 a
180 }
181 }
182 }
183
184 #[derive(Educe, Serialize, Deserialize, new)]
185 #[educe(Clone(bound = "F: Clone"))]
186 #[serde(
187 bound(serialize = "F: Serialize"),
188 bound(deserialize = "F: Deserialize<'de>")
189 )]
190 pub struct MinBy<A, F>(pub F, PhantomData<fn() -> A>);
191 impl<A, F: for<'a, 'b> FnMut<(&'a A, &'b A), Output = Ordering>> CombinerSync for MinBy<A, F> {
192 type Done = A;
193
194 fn combine(&mut self, a: A, b: A) -> A {
195 if self.0.call_mut((&a, &b)) == Ordering::Greater {
196 b
197 } else {
198 a
199 }
200 }
201 }
202
203 #[derive(Educe, Serialize, Deserialize, new)]
204 #[educe(Clone(bound = "F: Clone"))]
205 #[serde(
206 bound(serialize = "F: Serialize"),
207 bound(deserialize = "F: Deserialize<'de>")
208 )]
209 pub struct MinByKey<A, F, B>(pub F, pub PhantomData<fn(A, B)>);
210 impl<A, F: for<'a> FnMut<(&'a A,), Output = B>, B: Ord> CombinerSync for MinByKey<A, F, B> {
211 type Done = A;
212
213 fn combine(&mut self, a: A, b: A) -> A {
214 if self.0.call_mut((&a,)).cmp(&self.0.call_mut((&b,))) == Ordering::Greater {
215 b
216 } else {
217 a
218 }
219 }
220 }
221}