noir_compute/operator/window/descr/
processing_time.rs1use std::collections::VecDeque;
2use std::time::{Duration, Instant};
3
4use super::super::*;
5use crate::operator::{Data, StreamElement};
6
7#[derive(Clone)]
8pub struct ProcessingTimeWindowManager<A>
9where
10 A: WindowAccumulator,
11{
12 init: A,
13 size: Duration,
14 slide: Duration,
15 ws: VecDeque<Slot<A>>,
16}
17
18#[derive(Clone)]
19struct Slot<A> {
20 acc: A,
21 start: Instant,
22 end: Instant,
23 active: bool,
24}
25
26impl<A> Slot<A> {
27 #[inline]
28 fn new(acc: A, start: Instant, end: Instant) -> Self {
29 Self {
30 acc,
31 start,
32 end,
33 active: false,
34 }
35 }
36}
37
38impl<A: WindowAccumulator> WindowManager for ProcessingTimeWindowManager<A>
39where
40 A::In: Data,
41 A::Out: Data,
42{
43 type In = A::In;
44 type Out = A::Out;
45 type Output = Vec<WindowResult<A::Out>>;
46
47 #[inline]
48 fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
49 let now = Instant::now();
50 match el {
51 StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
52 while self.ws.back().map(|b| b.start < now).unwrap_or(true) {
54 let next_start = self.ws.back().map(|b| b.start + self.slide).unwrap_or(now);
55 self.ws.push_back(Slot::new(
56 self.init.clone(),
57 next_start,
58 next_start + self.size,
59 ));
60 }
61 self.ws
62 .iter_mut()
63 .skip_while(|w| w.end <= now)
64 .take_while(|w| w.start <= now)
65 .for_each(|w| {
66 w.acc.process(item.clone());
67 w.active = true;
68 });
69 }
70 StreamElement::Terminate | StreamElement::FlushAndRestart => {
71 return self
72 .ws
73 .drain(..)
74 .filter(|w| w.active)
75 .map(|w| WindowResult::Item(w.acc.output()))
76 .collect();
77 }
78 _ => {}
79 }
80
81 let split = self.ws.partition_point(|w| w.end < now);
82 self.ws
83 .drain(..split)
84 .filter(|w| w.active)
85 .map(|w| WindowResult::Item(w.acc.output()))
86 .collect()
87 }
88}
89
90#[derive(Clone)]
92pub struct ProcessingTimeWindow {
93 size: Duration,
94 slide: Duration,
95}
96
97impl ProcessingTimeWindow {
98 #[inline]
99 pub fn sliding(size: Duration, slide: Duration) -> Self {
100 assert!(!size.is_zero(), "window size must be > 0");
101 assert!(!slide.is_zero(), "window slide must be > 0");
102 Self { size, slide }
103 }
104
105 #[inline]
106 pub fn tumbling(size: Duration) -> Self {
107 assert!(!size.is_zero(), "window size must be > 0");
108 Self { size, slide: size }
109 }
110}
111
112impl<T: Data> WindowDescription<T> for ProcessingTimeWindow {
113 type Manager<A: WindowAccumulator<In = T>> = ProcessingTimeWindowManager<A>;
114
115 #[inline]
116 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
117 ProcessingTimeWindowManager {
118 init: accumulator,
119 size: self.size,
120 slide: self.slide,
121 ws: Default::default(),
122 }
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use crate::operator::window::aggr::Fold;
130
131 macro_rules! save_result {
132 ($ret:expr, $v:expr, $n:ident) => {{
133 let iter = $ret
134 .into_iter()
135 .inspect(|r| {
136 if !r.item().is_empty() {
137 $n += 1;
138 }
139 })
140 .map(|r| r.unwrap_item())
141 .flatten();
142 $v.extend(iter);
143 }};
144 }
145
146 #[test]
147 #[ignore]
148 fn processing_time_window() {
149 let size = Duration::from_micros(100);
150 let window = ProcessingTimeWindow::tumbling(size);
151
152 let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
153 let mut manager = window.build(fold);
154
155 let start = Instant::now();
156 let mut received = Vec::new();
157 let mut n_windows = 0;
158 for i in 1..100 {
159 save_result!(manager.process(StreamElement::Item(i)), received, n_windows);
160 }
161 let expected_n = start.elapsed().as_micros() / size.as_micros() + 1;
162
163 save_result!(
164 manager.process(StreamElement::FlushAndRestart),
165 received,
166 n_windows
167 );
168
169 eprintln!("expected {expected_n} windows");
170
171 received.sort();
172 assert_eq!(n_windows, expected_n);
173 assert_eq!(received, (1..100).collect::<Vec<_>>())
174 }
175}