noir_compute/operator/window/aggr/
max.rs1use std::cmp::Ordering;
2
3use super::{super::*, FoldFirst};
4use crate::operator::{Data, DataKey, Operator};
5use crate::stream::{KeyedStream, WindowedStream};
6
7impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
8where
9 WindowDescr: WindowDescription<Out>,
10 OperatorChain: Operator<Out = (Key, Out)> + 'static,
11 Key: DataKey,
12 Out: Data + Ord,
13{
14 pub fn max(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
15 let acc = FoldFirst::<Out, _>::new(|max, x| {
16 if x > *max {
17 *max = x
18 }
19 });
20 self.add_window_operator("WindowMax", acc)
21 }
22}
23
24impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
25where
26 WindowDescr: WindowDescription<Out>,
27 OperatorChain: Operator<Out = (Key, Out)> + 'static,
28 Key: DataKey,
29 Out: Data,
30{
31 pub fn max_by_key<K: Ord, F: Fn(&Out) -> K + Clone + Send + 'static>(
32 self,
33 get_key: F,
34 ) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
35 let acc = FoldFirst::<Out, _>::new(move |max, x| {
36 if (get_key)(&x) > (get_key)(max) {
37 *max = x
38 }
39 });
40 self.add_window_operator("WindowMax", acc)
41 }
42
43 pub fn max_by<F: Fn(&Out, &Out) -> Ordering + Clone + Send + 'static>(
44 self,
45 compare: F,
46 ) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
47 let acc = FoldFirst::<Out, _>::new(move |max, x| {
48 if (compare)(&x, max).is_gt() {
49 *max = x
50 }
51 });
52 self.add_window_operator("WindowMax", acc)
53 }
54}