noir_compute/operator/window/aggr/
collect_vec.rs

1use super::super::*;
2use crate::operator::{Data, DataKey, Operator};
3use crate::stream::{KeyedStream, WindowedStream};
4
5#[derive(Clone)]
6struct CollectVec<I, O, F>
7where
8    F: Fn(Vec<I>) -> O,
9{
10    vec: Vec<I>,
11    f: F,
12    _o: PhantomData<O>,
13}
14
15impl<I, O, F> WindowAccumulator for CollectVec<I, O, F>
16where
17    F: Fn(Vec<I>) -> O + Send + Clone + 'static,
18    I: Clone + Send + 'static,
19    O: Clone + Send + 'static,
20{
21    type In = I;
22
23    type Out = O;
24
25    #[inline]
26    fn process(&mut self, el: Self::In) {
27        self.vec.push(el);
28    }
29
30    #[inline]
31    fn output(self) -> Self::Out {
32        (self.f)(self.vec)
33    }
34}
35
36impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
37where
38    WindowDescr: WindowDescription<Out>,
39    OperatorChain: Operator<Out = (Key, Out)> + 'static,
40    Key: DataKey,
41    Out: Data + Ord,
42{
43    /// Prefer other aggregators if possible as they don't save all elements
44    pub fn map<NewOut: Data, F: Fn(Vec<Out>) -> NewOut + Send + Clone + 'static>(
45        self,
46        f: F,
47    ) -> KeyedStream<impl Operator<Out = (Key, NewOut)>> {
48        let acc = CollectVec::<Out, NewOut, _> {
49            vec: Default::default(),
50            f,
51            _o: PhantomData,
52        };
53        self.add_window_operator("WindowMap", acc)
54    }
55}