noir_compute/operator/window/aggr/
collect_vec.rs1use super::super::*;
2use crate::operator::{Data, DataKey, Operator};
3use crate::stream::{KeyedStream, WindowedStream};
4
5#[derive(Clone)]
6struct CollectVec<I, O, F>
7where
8 F: Fn(Vec<I>) -> O,
9{
10 vec: Vec<I>,
11 f: F,
12 _o: PhantomData<O>,
13}
14
15impl<I, O, F> WindowAccumulator for CollectVec<I, O, F>
16where
17 F: Fn(Vec<I>) -> O + Send + Clone + 'static,
18 I: Clone + Send + 'static,
19 O: Clone + Send + 'static,
20{
21 type In = I;
22
23 type Out = O;
24
25 #[inline]
26 fn process(&mut self, el: Self::In) {
27 self.vec.push(el);
28 }
29
30 #[inline]
31 fn output(self) -> Self::Out {
32 (self.f)(self.vec)
33 }
34}
35
36impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
37where
38 WindowDescr: WindowDescription<Out>,
39 OperatorChain: Operator<Out = (Key, Out)> + 'static,
40 Key: DataKey,
41 Out: Data + Ord,
42{
43 pub fn map<NewOut: Data, F: Fn(Vec<Out>) -> NewOut + Send + Clone + 'static>(
45 self,
46 f: F,
47 ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>> {
48 let acc = CollectVec::<Out, NewOut, _> {
49 vec: Default::default(),
50 f,
51 _o: PhantomData,
52 };
53 self.add_window_operator("WindowMap", acc)
54 }
55}