noir_compute/operator/window/aggr/
fold.rs1use std::marker::PhantomData;
2
3use crate::operator::{Data, DataKey, Operator};
4use crate::stream::{KeyedStream, WindowedStream};
5
6use super::super::*;
7
8#[derive(Clone)]
9pub(crate) struct Fold<I, S, F>
10where
11 F: FnMut(&mut S, I),
12{
13 state: S,
14 f: F,
15 _in: PhantomData<I>,
16}
17
18impl<I, S, F> Fold<I, S, F>
19where
20 F: FnMut(&mut S, I),
21{
22 pub(crate) fn new(state: S, f: F) -> Self {
23 Self {
24 state,
25 f,
26 _in: PhantomData,
27 }
28 }
29}
30
31impl<I, S, F> WindowAccumulator for Fold<I, S, F>
32where
33 I: Clone + Send + 'static,
34 S: Clone + Send + 'static,
35 F: FnMut(&mut S, I) + Clone + Send + 'static,
36{
37 type In = I;
38
39 type Out = S;
40
41 fn process(&mut self, el: Self::In) {
42 (self.f)(&mut self.state, el);
43 }
44
45 fn output(self) -> Self::Out {
46 self.state
47 }
48}
49
50#[derive(Clone)]
51pub(crate) struct FoldFirst<I, F>
52where
53 F: FnMut(&mut I, I),
54{
55 state: Option<I>,
56 f: F,
57}
58
59impl<I, F> FoldFirst<I, F>
60where
61 F: FnMut(&mut I, I),
62{
63 pub(crate) fn new(f: F) -> Self {
64 Self { state: None, f }
65 }
66}
67
68impl<I, F> WindowAccumulator for FoldFirst<I, F>
69where
70 I: Clone + Send + 'static,
71 F: FnMut(&mut I, I) + Clone + Send + 'static,
72{
73 type In = I;
74 type Out = I;
75
76 #[inline]
77 fn process(&mut self, el: Self::In) {
78 match self.state.as_mut() {
79 None => self.state = Some(el),
80 Some(s) => (self.f)(s, el),
81 }
82 }
83
84 #[inline]
85 fn output(self) -> Self::Out {
86 self.state
87 .expect("FoldFirst output called when it has received no elements!")
88 }
89}
90
91impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
92where
93 WindowDescr: WindowDescription<Out>,
94 OperatorChain: Operator<Out = (Key, Out)> + 'static,
95 Key: DataKey,
96 Out: Data,
97{
98 pub fn fold<NewOut: Data, F>(
127 self,
128 init: NewOut,
129 fold: F,
130 ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
131 where
132 F: FnMut(&mut NewOut, Out) + Clone + Send + 'static,
133 {
134 let acc = Fold::new(init, fold);
135 self.add_window_operator("WindowFold", acc)
136 }
137
138 pub fn fold_first<F>(self, fold: F) -> KeyedStream<impl Operator<Out = (Key, Out)>>
143 where
144 F: FnMut(&mut Out, Out) + Clone + Send + 'static,
145 {
146 let acc = FoldFirst::new(fold);
147 self.add_window_operator("WindowFoldFirst", acc)
148 }
149}