amadeus_core/par_sink/
max.rs

1use derive_new::new;
2use educe::Educe;
3use serde::{Deserialize, Serialize};
4use serde_closure::traits::FnMut;
5use std::{cmp::Ordering, marker::PhantomData};
6
7use super::{combiner_par_sink, CombinerSync, FolderSyncReducer, ParallelPipe, ParallelSink};
8
9#[derive(new)]
10#[must_use]
11pub struct Max<P> {
12	pipe: P,
13}
14impl_par_dist! {
15	impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for Max<P>
16	where
17		P::Output: Ord + Send + 'static,
18	{
19		combiner_par_sink!(combine::Max<P::Output>, self, combine::Max::new());
20	}
21}
22
23#[derive(new)]
24#[must_use]
25pub struct MaxBy<P, F> {
26	pipe: P,
27	f: F,
28}
29impl_par_dist! {
30	impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for MaxBy<P, F>
31	where
32		F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering>
33			+ Clone
34			+ Send
35			+ 'static,
36		P::Output: Send + 'static,
37	{
38		combiner_par_sink!(combine::MaxBy<P::Output,F>, self, combine::MaxBy::new(self.f));
39	}
40}
41
42#[derive(new)]
43#[must_use]
44pub struct MaxByKey<P, F> {
45	pipe: P,
46	f: F,
47}
48impl_par_dist! {
49	impl<P: ParallelPipe<Item>, Item, F, B> ParallelSink<Item> for MaxByKey<P, F>
50	where
51		F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static,
52		B: Ord + 'static,
53		P::Output: Send + 'static,
54	{
55		combiner_par_sink!(combine::MaxByKey<P::Output,F, B>, self, combine::MaxByKey::new(self.f));
56	}
57}
58
59#[derive(new)]
60#[must_use]
61pub struct Min<P> {
62	pipe: P,
63}
64impl_par_dist! {
65	impl<P: ParallelPipe<Item>, Item> ParallelSink<Item> for Min<P>
66	where
67		P::Output: Ord + Send + 'static,
68	{
69		combiner_par_sink!(combine::Min<P::Output>, self, combine::Min::new());
70	}
71}
72
73#[derive(new)]
74#[must_use]
75pub struct MinBy<P, F> {
76	pipe: P,
77	f: F,
78}
79impl_par_dist! {
80	impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for MinBy<P, F>
81	where
82		F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering>
83			+ Clone
84			+ Send
85			+ 'static,
86		P::Output: Send + 'static,
87	{
88		combiner_par_sink!(combine::MinBy<P::Output,F>, self, combine::MinBy::new(self.f));
89	}
90}
91
92#[derive(new)]
93#[must_use]
94pub struct MinByKey<P, F> {
95	pipe: P,
96	f: F,
97}
98impl_par_dist! {
99	impl<P: ParallelPipe<Item>, Item, F, B> ParallelSink<Item> for MinByKey<P, F>
100	where
101		F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static,
102		B: Ord + 'static,
103		P::Output: Send + 'static,
104	{
105		combiner_par_sink!(combine::MinByKey<P::Output,F, B>, self, combine::MinByKey::new(self.f));
106	}
107}
108
109mod combine {
110	use super::*;
111
112	#[derive(Educe, Serialize, Deserialize, new)]
113	#[educe(Clone)]
114	#[serde(bound = "")]
115	pub struct Max<A>(PhantomData<fn() -> A>);
116	impl<A: Ord> CombinerSync for Max<A> {
117		type Done = A;
118
119		fn combine(&mut self, a: A, b: A) -> A {
120			// switch to b even if it is only equal, to preserve stability.
121			if a.cmp(&b) != Ordering::Greater {
122				b
123			} else {
124				a
125			}
126		}
127	}
128
129	#[derive(Educe, Serialize, Deserialize, new)]
130	#[educe(Clone(bound = "F: Clone"))]
131	#[serde(
132		bound(serialize = "F: Serialize"),
133		bound(deserialize = "F: Deserialize<'de>")
134	)]
135	pub struct MaxBy<A, F>(pub F, PhantomData<fn() -> A>);
136	impl<A, F: for<'a, 'b> FnMut<(&'a A, &'b A), Output = Ordering>> CombinerSync for MaxBy<A, F> {
137		type Done = A;
138
139		fn combine(&mut self, a: A, b: A) -> A {
140			if self.0.call_mut((&a, &b)) != Ordering::Greater {
141				b
142			} else {
143				a
144			}
145		}
146	}
147
148	#[derive(Educe, Serialize, Deserialize, new)]
149	#[educe(Clone(bound = "F: Clone"))]
150	#[serde(
151		bound(serialize = "F: Serialize"),
152		bound(deserialize = "F: Deserialize<'de>")
153	)]
154	pub struct MaxByKey<A, F, B>(pub F, pub PhantomData<fn(A, B)>);
155	impl<A, F: for<'a> FnMut<(&'a A,), Output = B>, B: Ord> CombinerSync for MaxByKey<A, F, B> {
156		type Done = A;
157
158		fn combine(&mut self, a: A, b: A) -> A {
159			if self.0.call_mut((&a,)).cmp(&self.0.call_mut((&b,))) != Ordering::Greater {
160				b
161			} else {
162				a
163			}
164		}
165	}
166
167	#[derive(Educe, Serialize, Deserialize, new)]
168	#[educe(Clone)]
169	#[serde(bound = "")]
170	pub struct Min<A>(PhantomData<fn() -> A>);
171	impl<A: Ord> CombinerSync for Min<A> {
172		type Done = A;
173
174		fn combine(&mut self, a: A, b: A) -> A {
175			// switch to b even if it is strictly smaller, to preserve stability.
176			if a.cmp(&b) == Ordering::Greater {
177				b
178			} else {
179				a
180			}
181		}
182	}
183
184	#[derive(Educe, Serialize, Deserialize, new)]
185	#[educe(Clone(bound = "F: Clone"))]
186	#[serde(
187		bound(serialize = "F: Serialize"),
188		bound(deserialize = "F: Deserialize<'de>")
189	)]
190	pub struct MinBy<A, F>(pub F, PhantomData<fn() -> A>);
191	impl<A, F: for<'a, 'b> FnMut<(&'a A, &'b A), Output = Ordering>> CombinerSync for MinBy<A, F> {
192		type Done = A;
193
194		fn combine(&mut self, a: A, b: A) -> A {
195			if self.0.call_mut((&a, &b)) == Ordering::Greater {
196				b
197			} else {
198				a
199			}
200		}
201	}
202
203	#[derive(Educe, Serialize, Deserialize, new)]
204	#[educe(Clone(bound = "F: Clone"))]
205	#[serde(
206		bound(serialize = "F: Serialize"),
207		bound(deserialize = "F: Deserialize<'de>")
208	)]
209	pub struct MinByKey<A, F, B>(pub F, pub PhantomData<fn(A, B)>);
210	impl<A, F: for<'a> FnMut<(&'a A,), Output = B>, B: Ord> CombinerSync for MinByKey<A, F, B> {
211		type Done = A;
212
213		fn combine(&mut self, a: A, b: A) -> A {
214			if self.0.call_mut((&a,)).cmp(&self.0.call_mut((&b,))) == Ordering::Greater {
215				b
216			} else {
217				a
218			}
219		}
220	}
221}