noir_compute/operator/window/descr/
event_time.rs1use std::collections::VecDeque;
2
3use super::super::*;
4use crate::operator::{Data, StreamElement, Timestamp};
5
6#[derive(Clone, Debug)]
7pub struct EventTimeWindowManager<A>
8where
9 A: WindowAccumulator,
10{
11 init: A,
12 size: Timestamp,
13 slide: Timestamp,
14 last_watermark: Option<Timestamp>,
15 ws: VecDeque<Slot<A>>,
16}
17impl<A: WindowAccumulator> EventTimeWindowManager<A> {
18 fn alloc_windows(&mut self, ts: Timestamp) {
19 assert!(self.last_watermark.map(|w| ts >= w).unwrap_or(true));
20
21 while self.ws.back().map(|b| b.start < ts).unwrap_or(true) {
22 let mut next_start = self.ws.back().map(|b| b.start + self.slide).unwrap_or(ts);
23 if let Some(w) = self.last_watermark {
25 next_start += (w - next_start).max(0) / self.slide * self.slide
26 }
27
28 log::trace!("New window {}..{}", next_start, next_start + self.size);
29 self.ws.push_back(Slot::new(
30 self.init.clone(),
31 next_start,
32 next_start + self.size,
33 ));
34 }
35 }
36}
37
38#[derive(Clone, Debug)]
39struct Slot<A> {
40 acc: A,
41 start: Timestamp,
42 end: Timestamp,
43 active: bool,
44}
45
46impl<A> Slot<A> {
47 #[inline]
48 fn new(acc: A, start: Timestamp, end: Timestamp) -> Self {
49 Self {
50 acc,
51 start,
52 end,
53 active: false,
54 }
55 }
56}
57
58impl<A: WindowAccumulator> WindowManager for EventTimeWindowManager<A>
59where
60 A::In: Data,
61 A::Out: Data,
62{
63 type In = A::In;
64 type Out = A::Out;
65 type Output = Vec<WindowResult<A::Out>>;
66
67 #[inline]
68 fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
69 match el {
70 StreamElement::Timestamped(item, ts) => {
71 self.alloc_windows(ts);
72 self.ws
73 .iter_mut()
74 .skip_while(|w| w.end <= ts)
75 .take_while(|w| w.start <= ts)
76 .for_each(|w| {
77 w.acc.process(item.clone());
78 w.active = true;
79 });
80
81 Vec::new()
82 }
83 StreamElement::Watermark(ts) => {
84 self.last_watermark = Some(ts);
85 let split = self.ws.partition_point(|w| w.end < ts);
86 self.ws
87 .drain(..split)
88 .filter(|w| w.active)
89 .map(|w| WindowResult::Timestamped(w.acc.output(), w.end))
90 .collect()
91 }
92 StreamElement::FlushAndRestart | StreamElement::Terminate => self
93 .ws
94 .drain(..)
95 .filter(|w| w.active)
96 .map(|w| WindowResult::Timestamped(w.acc.output(), w.end))
97 .collect(),
98 StreamElement::Item(_) => {
99 panic!("Event time windows can only handle timestamped items!")
100 }
101 _ => Vec::new(),
102 }
103 }
104
105 fn recycle(&self) -> bool {
106 self.ws.is_empty()
107 }
108}
109
110#[derive(Clone)]
112pub struct EventTimeWindow {
113 size: Timestamp,
114 slide: Timestamp,
115}
116
117impl EventTimeWindow {
118 #[inline]
119 pub fn sliding(size: Timestamp, slide: Timestamp) -> Self {
120 assert!(size > 0, "window size must be > 0");
121 assert!(slide > 0, "window slide must be > 0");
122 Self { size, slide }
123 }
124
125 #[inline]
126 pub fn tumbling(size: Timestamp) -> Self {
127 assert!(size > 0, "window size must be > 0");
128 Self { size, slide: size }
129 }
130}
131
132impl<T: Data> WindowDescription<T> for EventTimeWindow {
133 type Manager<A: WindowAccumulator<In = T>> = EventTimeWindowManager<A>;
134
135 #[inline]
136 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
137 EventTimeWindowManager {
138 init: accumulator,
139 size: self.size,
140 slide: self.slide,
141 last_watermark: Default::default(),
142 ws: Default::default(),
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::operator::window::aggr::Fold;
151
152 macro_rules! save_result {
153 ($ret:expr, $v:expr) => {{
154 let iter = $ret.into_iter().map(|r| r.unwrap_item());
155 $v.extend(iter);
156 }};
157 }
158
159 #[test]
160 fn event_time_window() {
161 let window = EventTimeWindow::sliding(5, 4);
162
163 let fold = Fold::new(Vec::new(), |v, el| v.push(el));
164 let mut manager = window.build(fold);
165
166 let mut received = Vec::new();
167 for i in 1..100i64 {
168 save_result!(
169 manager.process(StreamElement::Timestamped(i, i / 10)),
170 received
171 );
172
173 if i % 7 == 0 {
174 save_result!(manager.process(StreamElement::Watermark(i / 10)), received);
175 }
176 }
177 save_result!(manager.process(StreamElement::FlushAndRestart), received);
178
179 received.sort();
180
181 let expected: Vec<Vec<_>> =
182 vec![(1..50).collect(), (40..90).collect(), (80..100).collect()];
183 assert_eq!(received, expected)
184 }
185
186 #[test]
187 fn event_time_window_spars() {
188 let window = EventTimeWindow::sliding(5, 4);
189
190 let fold = Fold::new(Vec::new(), |v, el| v.push(el));
191 let mut manager = window.build(fold);
192
193 let mut received = Vec::new();
194 for i in 1..40 {
195 if i % 15 <= 1 {
196 save_result!(manager.process(StreamElement::Timestamped(i, i)), received);
197 }
198 if i % 3 == 0 {
199 save_result!(manager.process(StreamElement::Watermark(i)), received);
200 }
201 }
202 save_result!(manager.process(StreamElement::FlushAndRestart), received);
203
204 received.sort();
205
206 let expected: Vec<Vec<_>> = vec![vec![1], vec![15, 16], vec![30, 31]];
207 assert_eq!(received, expected)
208 }
209}