1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::marker::PhantomData;
use crate::operator::{Data, DataKey, Operator};
use crate::stream::{KeyedStream, WindowedStream};
use super::super::*;
#[derive(Clone)]
pub(crate) struct Fold<I, S, F>
where
F: FnMut(&mut S, I),
{
state: S,
f: F,
_in: PhantomData<I>,
}
impl<I, S, F> Fold<I, S, F>
where
F: FnMut(&mut S, I),
{
pub(crate) fn new(state: S, f: F) -> Self {
Self {
state,
f,
_in: PhantomData,
}
}
}
impl<I, S, F> WindowAccumulator for Fold<I, S, F>
where
I: Clone + Send + 'static,
S: Clone + Send + 'static,
F: FnMut(&mut S, I) + Clone + Send + 'static,
{
type In = I;
type Out = S;
fn process(&mut self, el: Self::In) {
(self.f)(&mut self.state, el);
}
fn output(self) -> Self::Out {
self.state
}
}
#[derive(Clone)]
pub(crate) struct FoldFirst<I, F>
where
F: FnMut(&mut I, I),
{
state: Option<I>,
f: F,
}
impl<I, F> FoldFirst<I, F>
where
F: FnMut(&mut I, I),
{
pub(crate) fn new(f: F) -> Self {
Self { state: None, f }
}
}
impl<I, F> WindowAccumulator for FoldFirst<I, F>
where
I: Clone + Send + 'static,
F: FnMut(&mut I, I) + Clone + Send + 'static,
{
type In = I;
type Out = I;
#[inline]
fn process(&mut self, el: Self::In) {
match self.state.as_mut() {
None => self.state = Some(el),
Some(s) => (self.f)(s, el),
}
}
#[inline]
fn output(self) -> Self::Out {
self.state
.expect("FoldFirst output called when it has received no elements!")
}
}
impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
where
WindowDescr: WindowDescription<Out>,
OperatorChain: Operator<Out = (Key, Out)> + 'static,
Key: DataKey,
Out: Data,
{
/// Folds the elements of each window into an accumulator value
///
/// `fold()` takes two arguments: the initial value of the accumulator and a closure used to
/// accumulate the elements of each window.
///
/// The closure is called once for each element of each window with two arguments: a mutable
/// reference to the accumulator and the element of the window. The closure should modify
/// the accumulator, without returning anything.
///
/// ## Example
/// ```
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # use noir_compute::operator::window::CountWindow;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s = env.stream_iter(0..5);
/// let res = s
/// .group_by(|&n| n % 2)
/// .window(CountWindow::tumbling(2))
/// .fold(1, |acc, n| *acc *= n)
/// .collect_vec();
///
/// env.execute_blocking();
///
/// let mut res = res.get().unwrap();
/// res.sort_unstable();
/// assert_eq!(res, vec![(0, 0 * 2), (1, 1 * 3)]);
/// ```
pub fn fold<NewOut: Data, F>(
self,
init: NewOut,
fold: F,
) -> KeyedStream<impl Operator<Out = (Key, NewOut)>>
where
F: FnMut(&mut NewOut, Out) + Clone + Send + 'static,
{
let acc = Fold::new(init, fold);
self.add_window_operator("WindowFold", acc)
}
/// Folds the elements of each window into an accumulator value, starting with the first value
///
/// TODO DOCS
///
pub fn fold_first<F>(self, fold: F) -> KeyedStream<impl Operator<Out = (Key, Out)>>
where
F: FnMut(&mut Out, Out) + Clone + Send + 'static,
{
let acc = FoldFirst::new(fold);
self.add_window_operator("WindowFoldFirst", acc)
}
}