noir_compute/operator/window/descr/
count.rs1use std::collections::VecDeque;
4
5use crate::operator::{Data, StreamElement, Timestamp};
9
10use super::super::*;
11
12#[derive(Clone)]
13pub struct CountWindowManager<A> {
14 init: A,
15 size: usize,
16 slide: usize,
17 exact: bool,
18 ws: VecDeque<Slot<A>>,
19}
20
21#[derive(Clone)]
22struct Slot<A> {
23 count: usize,
24 acc: A,
25 ts: Option<Timestamp>,
26}
27
28impl<A> Slot<A> {
29 #[inline]
30 fn new(acc: A) -> Self {
31 Self {
32 count: 0,
33 acc,
34 ts: None,
35 }
36 }
37}
38
39impl<A: WindowAccumulator> CountWindowManager<A> {
40 #[inline]
41 fn update_slot(&mut self, idx: usize, el: A::In, ts: Option<Timestamp>) {
42 self.ws[idx].count += 1;
43 self.ws[idx].ts = match (self.ws[idx].ts, ts) {
44 (Some(a), Some(b)) => Some(a.max(b)),
45 (Some(t), None) | (None, Some(t)) => Some(t),
46 (None, None) => None,
47 };
48 self.ws[idx].acc.process(el);
49 }
50}
51
52impl<A: WindowAccumulator> WindowManager for CountWindowManager<A>
53where
54 A::In: Data,
55 A::Out: Data,
56{
57 type In = A::In;
58 type Out = A::Out;
59 type Output = Option<WindowResult<A::Out>>;
60
61 #[inline]
62 fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
63 let ts = el.timestamp().cloned();
64 match el {
65 StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
66 while self.ws.len() < (self.size + self.slide - 1) / self.slide {
67 self.ws.push_back(Slot::new(self.init.clone()))
68 }
69 let k = self.ws.front().unwrap().count / self.slide + 1; for i in 0..k {
71 self.update_slot(i, item.clone(), ts);
72 }
73 if self.ws[0].count == self.size {
74 let r = self.ws.pop_front().unwrap();
75 Some(WindowResult::new(r.acc.output(), r.ts))
76 } else {
77 None
78 }
79 }
80 StreamElement::FlushAndRestart | StreamElement::Terminate => {
81 let ret = if self.exact {
82 None
83 } else {
84 self.ws
85 .pop_front()
86 .filter(|r| r.count > 0)
87 .map(|r| WindowResult::new(r.acc.output(), r.ts))
88 };
89 self.ws.drain(..);
90 ret
91 }
92 _ => None,
93 }
94 }
95}
96
97#[derive(Clone)]
99pub struct CountWindow {
100 pub size: usize,
101 pub slide: usize,
102 pub exact: bool,
105}
106
107impl CountWindow {
108 #[inline]
112 pub fn new(size: usize, slide: usize, exact: bool) -> Self {
113 Self { size, slide, exact }
114 }
115
116 #[inline]
118 pub fn sliding(size: usize, slide: usize) -> Self {
119 assert!(size > 0, "window size must be > 0"); assert!(slide > 0, "window slide must be > 0");
121 Self {
122 size,
123 slide,
124 exact: true,
125 }
126 }
127
128 #[inline]
130 pub fn tumbling(size: usize) -> Self {
131 assert!(size > 0, "window size must be > 0");
132 Self {
133 size,
134 slide: size,
135 exact: true,
136 }
137 }
138}
139
140impl<T: Data> WindowDescription<T> for CountWindow {
141 type Manager<A: WindowAccumulator<In = T>> = CountWindowManager<A>;
142
143 #[inline]
144 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
145 CountWindowManager {
146 init: accumulator,
147 size: self.size,
148 slide: self.slide,
149 exact: self.exact,
150 ws: Default::default(),
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::operator::window::aggr::Fold;
159
160 macro_rules! check_return {
161 ($ret:expr, $v:expr) => {{
162 let mut ia = $ret.into_iter();
163 let mut ib = $v.into_iter();
164 loop {
165 let (a, b) = (ia.next(), ib.next());
166 assert_eq!(a, b);
167
168 if let (None, None) = (a, b) {
169 break;
170 }
171 }
172 }};
173 }
174
175 #[test]
176 fn count_window() {
177 let size = 3;
178 let slide = 2;
179 let window = CountWindow::sliding(3, 2);
180
181 let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
182 let mut manager = window.build(fold);
183
184 for i in 1..100 {
185 let expected = if i >= size && (i - size) % slide == 0 {
186 let v = ((i - size + 1)..=(i)).collect::<Vec<_>>();
187 Some(WindowResult::Item(v))
188 } else {
189 None
190 };
191 eprintln!("{expected:?}");
192 check_return!(manager.process(StreamElement::Item(i)), expected);
193 }
194 }
195
196 #[test]
197 #[cfg(feature = "timestamp")]
198 fn count_window_timestamped() {
199 let size = 3;
200 let slide = 2;
201 let window = CountWindow::sliding(3, 2);
202
203 let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
204 let mut manager = window.build(fold);
205
206 for i in 1..100 {
207 let expected = if i >= size && (i - size) % slide == 0 {
208 let v = ((i - size + 1)..=(i)).collect::<Vec<_>>();
209 Some(WindowResult::Timestamped(v, i as i64 / 2))
210 } else {
211 None
212 };
213 eprintln!("{expected:?}");
214 check_return!(
215 manager.process(StreamElement::Timestamped(i, i as i64 / 2)),
216 expected
217 );
218 }
219 }
220}