noir_compute/operator/window/aggr/
nth.rs

1use super::super::*;
2use crate::operator::{Data, DataKey, Operator};
3use crate::stream::{KeyedStream, WindowedStream};
4
5#[derive(Clone)]
6pub(crate) struct First<T>(Option<T>);
7
8impl<T: Data> WindowAccumulator for First<T> {
9    type In = T;
10    type Out = T;
11
12    #[inline]
13    fn process(&mut self, el: Self::In) {
14        if self.0.is_none() {
15            self.0 = Some(el);
16        }
17    }
18
19    #[inline]
20    fn output(self) -> Self::Out {
21        self.0
22            .expect("First::output() called before any element was processed")
23    }
24}
25
26#[derive(Clone)]
27pub(crate) struct Last<T>(Option<T>);
28
29impl<T: Data> WindowAccumulator for Last<T> {
30    type In = T;
31    type Out = T;
32
33    #[inline]
34    fn process(&mut self, el: Self::In) {
35        self.0 = Some(el);
36    }
37
38    #[inline]
39    fn output(self) -> Self::Out {
40        self.0
41            .expect("First::output() called before any element was processed")
42    }
43}
44
45impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
46where
47    WindowDescr: WindowDescription<Out>,
48    OperatorChain: Operator<Out = (Key, Out)> + 'static,
49    Key: DataKey,
50    Out: Data,
51{
52    pub fn first(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
53        let acc = First(None);
54        self.add_window_operator("WindowFirst", acc)
55    }
56}
57
58impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
59where
60    WindowDescr: WindowDescription<Out>,
61    OperatorChain: Operator<Out = (Key, Out)> + 'static,
62    Key: DataKey,
63    Out: Data,
64{
65    pub fn last(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
66        let acc = Last(None);
67        self.add_window_operator("WindowLast", acc)
68    }
69}