noir_compute/operator/window/descr/
event_time.rs

1use std::collections::VecDeque;
2
3use super::super::*;
4use crate::operator::{Data, StreamElement, Timestamp};
5
6#[derive(Clone, Debug)]
7pub struct EventTimeWindowManager<A>
8where
9    A: WindowAccumulator,
10{
11    init: A,
12    size: Timestamp,
13    slide: Timestamp,
14    last_watermark: Option<Timestamp>,
15    ws: VecDeque<Slot<A>>,
16}
17impl<A: WindowAccumulator> EventTimeWindowManager<A> {
18    fn alloc_windows(&mut self, ts: Timestamp) {
19        assert!(self.last_watermark.map(|w| ts >= w).unwrap_or(true));
20
21        while self.ws.back().map(|b| b.start < ts).unwrap_or(true) {
22            let mut next_start = self.ws.back().map(|b| b.start + self.slide).unwrap_or(ts);
23            // Skip empty windows
24            if let Some(w) = self.last_watermark {
25                next_start += (w - next_start).max(0) / self.slide * self.slide
26            }
27
28            log::trace!("New window {}..{}", next_start, next_start + self.size);
29            self.ws.push_back(Slot::new(
30                self.init.clone(),
31                next_start,
32                next_start + self.size,
33            ));
34        }
35    }
36}
37
38#[derive(Clone, Debug)]
39struct Slot<A> {
40    acc: A,
41    start: Timestamp,
42    end: Timestamp,
43    active: bool,
44}
45
46impl<A> Slot<A> {
47    #[inline]
48    fn new(acc: A, start: Timestamp, end: Timestamp) -> Self {
49        Self {
50            acc,
51            start,
52            end,
53            active: false,
54        }
55    }
56}
57
58impl<A: WindowAccumulator> WindowManager for EventTimeWindowManager<A>
59where
60    A::In: Data,
61    A::Out: Data,
62{
63    type In = A::In;
64    type Out = A::Out;
65    type Output = Vec<WindowResult<A::Out>>;
66
67    #[inline]
68    fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
69        match el {
70            StreamElement::Timestamped(item, ts) => {
71                self.alloc_windows(ts);
72                self.ws
73                    .iter_mut()
74                    .skip_while(|w| w.end <= ts)
75                    .take_while(|w| w.start <= ts)
76                    .for_each(|w| {
77                        w.acc.process(item.clone());
78                        w.active = true;
79                    });
80
81                Vec::new()
82            }
83            StreamElement::Watermark(ts) => {
84                self.last_watermark = Some(ts);
85                let split = self.ws.partition_point(|w| w.end < ts);
86                self.ws
87                    .drain(..split)
88                    .filter(|w| w.active)
89                    .map(|w| WindowResult::Timestamped(w.acc.output(), w.end))
90                    .collect()
91            }
92            StreamElement::FlushAndRestart | StreamElement::Terminate => self
93                .ws
94                .drain(..)
95                .filter(|w| w.active)
96                .map(|w| WindowResult::Timestamped(w.acc.output(), w.end))
97                .collect(),
98            StreamElement::Item(_) => {
99                panic!("Event time windows can only handle timestamped items!")
100            }
101            _ => Vec::new(),
102        }
103    }
104
105    fn recycle(&self) -> bool {
106        self.ws.is_empty()
107    }
108}
109
110/// Window based on event timestamps
111#[derive(Clone)]
112pub struct EventTimeWindow {
113    size: Timestamp,
114    slide: Timestamp,
115}
116
117impl EventTimeWindow {
118    #[inline]
119    pub fn sliding(size: Timestamp, slide: Timestamp) -> Self {
120        assert!(size > 0, "window size must be > 0");
121        assert!(slide > 0, "window slide must be > 0");
122        Self { size, slide }
123    }
124
125    #[inline]
126    pub fn tumbling(size: Timestamp) -> Self {
127        assert!(size > 0, "window size must be > 0");
128        Self { size, slide: size }
129    }
130}
131
132impl<T: Data> WindowDescription<T> for EventTimeWindow {
133    type Manager<A: WindowAccumulator<In = T>> = EventTimeWindowManager<A>;
134
135    #[inline]
136    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
137        EventTimeWindowManager {
138            init: accumulator,
139            size: self.size,
140            slide: self.slide,
141            last_watermark: Default::default(),
142            ws: Default::default(),
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::operator::window::aggr::Fold;
151
152    macro_rules! save_result {
153        ($ret:expr, $v:expr) => {{
154            let iter = $ret.into_iter().map(|r| r.unwrap_item());
155            $v.extend(iter);
156        }};
157    }
158
159    #[test]
160    fn event_time_window() {
161        let window = EventTimeWindow::sliding(5, 4);
162
163        let fold = Fold::new(Vec::new(), |v, el| v.push(el));
164        let mut manager = window.build(fold);
165
166        let mut received = Vec::new();
167        for i in 1..100i64 {
168            save_result!(
169                manager.process(StreamElement::Timestamped(i, i / 10)),
170                received
171            );
172
173            if i % 7 == 0 {
174                save_result!(manager.process(StreamElement::Watermark(i / 10)), received);
175            }
176        }
177        save_result!(manager.process(StreamElement::FlushAndRestart), received);
178
179        received.sort();
180
181        let expected: Vec<Vec<_>> =
182            vec![(1..50).collect(), (40..90).collect(), (80..100).collect()];
183        assert_eq!(received, expected)
184    }
185
186    #[test]
187    fn event_time_window_spars() {
188        let window = EventTimeWindow::sliding(5, 4);
189
190        let fold = Fold::new(Vec::new(), |v, el| v.push(el));
191        let mut manager = window.build(fold);
192
193        let mut received = Vec::new();
194        for i in 1..40 {
195            if i % 15 <= 1 {
196                save_result!(manager.process(StreamElement::Timestamped(i, i)), received);
197            }
198            if i % 3 == 0 {
199                save_result!(manager.process(StreamElement::Watermark(i)), received);
200            }
201        }
202        save_result!(manager.process(StreamElement::FlushAndRestart), received);
203
204        received.sort();
205
206        let expected: Vec<Vec<_>> = vec![vec![1], vec![15, 16], vec![30, 31]];
207        assert_eq!(received, expected)
208    }
209}