noir_compute/operator/window/descr/
count.rs

1//! The types related to the windowed streams.
2
3use std::collections::VecDeque;
4
5// pub use aggregator::*;
6// pub use description::*;
7
8use crate::operator::{Data, StreamElement, Timestamp};
9
10use super::super::*;
11
12#[derive(Clone)]
13pub struct CountWindowManager<A> {
14    init: A,
15    size: usize,
16    slide: usize,
17    exact: bool,
18    ws: VecDeque<Slot<A>>,
19}
20
21#[derive(Clone)]
22struct Slot<A> {
23    count: usize,
24    acc: A,
25    ts: Option<Timestamp>,
26}
27
28impl<A> Slot<A> {
29    #[inline]
30    fn new(acc: A) -> Self {
31        Self {
32            count: 0,
33            acc,
34            ts: None,
35        }
36    }
37}
38
39impl<A: WindowAccumulator> CountWindowManager<A> {
40    #[inline]
41    fn update_slot(&mut self, idx: usize, el: A::In, ts: Option<Timestamp>) {
42        self.ws[idx].count += 1;
43        self.ws[idx].ts = match (self.ws[idx].ts, ts) {
44            (Some(a), Some(b)) => Some(a.max(b)),
45            (Some(t), None) | (None, Some(t)) => Some(t),
46            (None, None) => None,
47        };
48        self.ws[idx].acc.process(el);
49    }
50}
51
52impl<A: WindowAccumulator> WindowManager for CountWindowManager<A>
53where
54    A::In: Data,
55    A::Out: Data,
56{
57    type In = A::In;
58    type Out = A::Out;
59    type Output = Option<WindowResult<A::Out>>;
60
61    #[inline]
62    fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
63        let ts = el.timestamp().cloned();
64        match el {
65            StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
66                while self.ws.len() < (self.size + self.slide - 1) / self.slide {
67                    self.ws.push_back(Slot::new(self.init.clone()))
68                }
69                let k = self.ws.front().unwrap().count / self.slide + 1; // TODO: Check
70                for i in 0..k {
71                    self.update_slot(i, item.clone(), ts);
72                }
73                if self.ws[0].count == self.size {
74                    let r = self.ws.pop_front().unwrap();
75                    Some(WindowResult::new(r.acc.output(), r.ts))
76                } else {
77                    None
78                }
79            }
80            StreamElement::FlushAndRestart | StreamElement::Terminate => {
81                let ret = if self.exact {
82                    None
83                } else {
84                    self.ws
85                        .pop_front()
86                        .filter(|r| r.count > 0)
87                        .map(|r| WindowResult::new(r.acc.output(), r.ts))
88                };
89                self.ws.drain(..);
90                ret
91            }
92            _ => None,
93        }
94    }
95}
96
97/// Window of fixed count of elements
98#[derive(Clone)]
99pub struct CountWindow {
100    pub size: usize,
101    pub slide: usize,
102    /// If exact is `true`, only results from windows of size `size` will be returned.
103    /// If exact is `false`, on terminate, the first incomplete window result will be returned if present
104    pub exact: bool,
105}
106
107impl CountWindow {
108    /// Windows of `size` elements, generated each `slide` elements.
109    /// If exact is `true`, only results from windows of size `size` will be returned.
110    /// If exact is `false`, on terminate, the first incomplete window result will be returned if present
111    #[inline]
112    pub fn new(size: usize, slide: usize, exact: bool) -> Self {
113        Self { size, slide, exact }
114    }
115
116    /// Exact windows of `size` elements, generated each `slide` elements
117    #[inline]
118    pub fn sliding(size: usize, slide: usize) -> Self {
119        assert!(size > 0, "window size must be > 0"); // TODO: consider using NonZeroUsize
120        assert!(slide > 0, "window slide must be > 0");
121        Self {
122            size,
123            slide,
124            exact: true,
125        }
126    }
127
128    /// Exact windows of `size` elements
129    #[inline]
130    pub fn tumbling(size: usize) -> Self {
131        assert!(size > 0, "window size must be > 0");
132        Self {
133            size,
134            slide: size,
135            exact: true,
136        }
137    }
138}
139
140impl<T: Data> WindowDescription<T> for CountWindow {
141    type Manager<A: WindowAccumulator<In = T>> = CountWindowManager<A>;
142
143    #[inline]
144    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
145        CountWindowManager {
146            init: accumulator,
147            size: self.size,
148            slide: self.slide,
149            exact: self.exact,
150            ws: Default::default(),
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::operator::window::aggr::Fold;
159
160    macro_rules! check_return {
161        ($ret:expr, $v:expr) => {{
162            let mut ia = $ret.into_iter();
163            let mut ib = $v.into_iter();
164            loop {
165                let (a, b) = (ia.next(), ib.next());
166                assert_eq!(a, b);
167
168                if let (None, None) = (a, b) {
169                    break;
170                }
171            }
172        }};
173    }
174
175    #[test]
176    fn count_window() {
177        let size = 3;
178        let slide = 2;
179        let window = CountWindow::sliding(3, 2);
180
181        let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
182        let mut manager = window.build(fold);
183
184        for i in 1..100 {
185            let expected = if i >= size && (i - size) % slide == 0 {
186                let v = ((i - size + 1)..=(i)).collect::<Vec<_>>();
187                Some(WindowResult::Item(v))
188            } else {
189                None
190            };
191            eprintln!("{expected:?}");
192            check_return!(manager.process(StreamElement::Item(i)), expected);
193        }
194    }
195
196    #[test]
197    #[cfg(feature = "timestamp")]
198    fn count_window_timestamped() {
199        let size = 3;
200        let slide = 2;
201        let window = CountWindow::sliding(3, 2);
202
203        let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
204        let mut manager = window.build(fold);
205
206        for i in 1..100 {
207            let expected = if i >= size && (i - size) % slide == 0 {
208                let v = ((i - size + 1)..=(i)).collect::<Vec<_>>();
209                Some(WindowResult::Timestamped(v, i as i64 / 2))
210            } else {
211                None
212            };
213            eprintln!("{expected:?}");
214            check_return!(
215                manager.process(StreamElement::Timestamped(i, i as i64 / 2)),
216                expected
217            );
218        }
219    }
220}