noir_compute/operator/window/aggr/
nth.rs1use super::super::*;
2use crate::operator::{Data, DataKey, Operator};
3use crate::stream::{KeyedStream, WindowedStream};
4
5#[derive(Clone)]
6pub(crate) struct First<T>(Option<T>);
7
8impl<T: Data> WindowAccumulator for First<T> {
9 type In = T;
10 type Out = T;
11
12 #[inline]
13 fn process(&mut self, el: Self::In) {
14 if self.0.is_none() {
15 self.0 = Some(el);
16 }
17 }
18
19 #[inline]
20 fn output(self) -> Self::Out {
21 self.0
22 .expect("First::output() called before any element was processed")
23 }
24}
25
26#[derive(Clone)]
27pub(crate) struct Last<T>(Option<T>);
28
29impl<T: Data> WindowAccumulator for Last<T> {
30 type In = T;
31 type Out = T;
32
33 #[inline]
34 fn process(&mut self, el: Self::In) {
35 self.0 = Some(el);
36 }
37
38 #[inline]
39 fn output(self) -> Self::Out {
40 self.0
41 .expect("First::output() called before any element was processed")
42 }
43}
44
45impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
46where
47 WindowDescr: WindowDescription<Out>,
48 OperatorChain: Operator<Out = (Key, Out)> + 'static,
49 Key: DataKey,
50 Out: Data,
51{
52 pub fn first(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
53 let acc = First(None);
54 self.add_window_operator("WindowFirst", acc)
55 }
56}
57
58impl<Key, Out, WindowDescr, OperatorChain> WindowedStream<OperatorChain, Out, WindowDescr>
59where
60 WindowDescr: WindowDescription<Out>,
61 OperatorChain: Operator<Out = (Key, Out)> + 'static,
62 Key: DataKey,
63 Out: Data,
64{
65 pub fn last(self) -> KeyedStream<impl Operator<Out = (Key, Out)>> {
66 let acc = Last(None);
67 self.add_window_operator("WindowLast", acc)
68 }
69}