noir_compute/operator/window/aggr/
fold.rs

1use std::marker::PhantomData;
2
3use crate::operator::{Data, DataKey, Operator};
4use crate::stream::{KeyedStream, WindowedStream};
5
6use super::super::*;
7
8#[derive(Clone)]
9pub(crate) struct Fold<I, S, F>
10where
11    F: FnMut(&mut S, I),
12{
13    state: S,
14    f: F,
15    _in: PhantomData<I>,
16}
17
18impl<I, S, F> Fold<I, S, F>
19where
20    F: FnMut(&mut S, I),
21{
22    pub(crate) fn new(state: S, f: F) -> Self {
23        Self {
24            state,
25            f,
26            _in: PhantomData,
27        }
28    }
29}
30
31impl<I, S, F> WindowAccumulator for Fold<I, S, F>
32where
33    I: Clone + Send + 'static,
34    S: Clone + Send + 'static,
35    F: FnMut(&mut S, I) + Clone + Send + 'static,
36{
37    type In = I;
38
39    type Out = S;
40
41    fn process(&mut self, el: Self::In) {
42        (self.f)(&mut self.state, el);
43    }
44
45    fn output(self) -> Self::Out {
46        self.state
47    }
48}
49
50#[derive(Clone)]
51pub(crate) struct FoldFirst<I, F>
52where
53    F: FnMut(&mut I, I),
54{
55    state: Option<I>,
56    f: F,
57}
58
59impl<I, F> FoldFirst<I, F>
60where
61    F: FnMut(&mut I, I),
62{
63    pub(crate) fn new(f: F) -> Self {
64        Self { state: None, f }
65    }
66}
67
68impl<I, F> WindowAccumulator for FoldFirst<I, F>
69where
70    I: Clone + Send + 'static,
71    F: FnMut(&mut I, I) + Clone + Send + 'static,
72{
73    type In = I;
74    type Out = I;
75
76    #[inline]
77    fn process(&mut self, el: Self::In) {
78        match self.state.as_mut() {
79            None => self.state = Some(el),
80            Some(s) => (self.f)(s, el),
81        }
82    }
83
84    #[inline]
85    fn output(self) -> Self::Out {
86        self.state
87            .expect("FoldFirst output called when it has received no elements!")
88    }
89}
90
91impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
92where
93    WindowDescr: WindowDescription<Out>,
94    OperatorChain: Operator<Out = (Key, Out)> + 'static,
95    Key: DataKey,
96    Out: Data,
97{
98    /// Folds the elements of each window into an accumulator value
99    ///
100    /// `fold()` takes two arguments: the initial value of the accumulator and a closure used to
101    /// accumulate the elements of each window.
102    ///
103    /// The closure is called once for each element of each window with two arguments: a mutable
104    /// reference to the accumulator and the element of the window. The closure should modify
105    /// the accumulator, without returning anything.
106    ///
107    /// ## Example
108    /// ```
109    /// # use noir_compute::{StreamContext, RuntimeConfig};
110    /// # use noir_compute::operator::source::IteratorSource;
111    /// # use noir_compute::operator::window::CountWindow;
112    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
113    /// let s = env.stream_iter(0..5);
114    /// let res = s
115    ///     .group_by(|&n| n % 2)
116    ///     .window(CountWindow::tumbling(2))
117    ///     .fold(1, |acc, n| *acc *= n)
118    ///     .collect_vec();
119    ///
120    /// env.execute_blocking();
121    ///
122    /// let mut res = res.get().unwrap();
123    /// res.sort_unstable();
124    /// assert_eq!(res, vec![(0, 0 * 2), (1, 1 * 3)]);
125    /// ```
126    pub fn fold<NewOut: Data, F>(
127        self,
128        init: NewOut,
129        fold: F,
130    ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
131    where
132        F: FnMut(&mut NewOut, Out) + Clone + Send + 'static,
133    {
134        let acc = Fold::new(init, fold);
135        self.add_window_operator("WindowFold", acc)
136    }
137
138    /// Folds the elements of each window into an accumulator value, starting with the first value
139    ///
140    /// TODO DOCS
141    ///
142    pub fn fold_first<F>(self, fold: F) -> KeyedStream<impl Operator<Out = (Key, Out)>>
143    where
144        F: FnMut(&mut Out, Out) + Clone + Send + 'static,
145    {
146        let acc = FoldFirst::new(fold);
147        self.add_window_operator("WindowFoldFirst", acc)
148    }
149}