noir_compute/operator/window/descr/
processing_time.rs

1use std::collections::VecDeque;
2use std::time::{Duration, Instant};
3
4use super::super::*;
5use crate::operator::{Data, StreamElement};
6
7#[derive(Clone)]
8pub struct ProcessingTimeWindowManager<A>
9where
10    A: WindowAccumulator,
11{
12    init: A,
13    size: Duration,
14    slide: Duration,
15    ws: VecDeque<Slot<A>>,
16}
17
18#[derive(Clone)]
19struct Slot<A> {
20    acc: A,
21    start: Instant,
22    end: Instant,
23    active: bool,
24}
25
26impl<A> Slot<A> {
27    #[inline]
28    fn new(acc: A, start: Instant, end: Instant) -> Self {
29        Self {
30            acc,
31            start,
32            end,
33            active: false,
34        }
35    }
36}
37
38impl<A: WindowAccumulator> WindowManager for ProcessingTimeWindowManager<A>
39where
40    A::In: Data,
41    A::Out: Data,
42{
43    type In = A::In;
44    type Out = A::Out;
45    type Output = Vec<WindowResult<A::Out>>;
46
47    #[inline]
48    fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
49        let now = Instant::now();
50        match el {
51            StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
52                // TODO: Windows are not aligned if there are periods without windows, evaluate if it needs to be changed
53                while self.ws.back().map(|b| b.start < now).unwrap_or(true) {
54                    let next_start = self.ws.back().map(|b| b.start + self.slide).unwrap_or(now);
55                    self.ws.push_back(Slot::new(
56                        self.init.clone(),
57                        next_start,
58                        next_start + self.size,
59                    ));
60                }
61                self.ws
62                    .iter_mut()
63                    .skip_while(|w| w.end <= now)
64                    .take_while(|w| w.start <= now)
65                    .for_each(|w| {
66                        w.acc.process(item.clone());
67                        w.active = true;
68                    });
69            }
70            StreamElement::Terminate | StreamElement::FlushAndRestart => {
71                return self
72                    .ws
73                    .drain(..)
74                    .filter(|w| w.active)
75                    .map(|w| WindowResult::Item(w.acc.output()))
76                    .collect();
77            }
78            _ => {}
79        }
80
81        let split = self.ws.partition_point(|w| w.end < now);
82        self.ws
83            .drain(..split)
84            .filter(|w| w.active)
85            .map(|w| WindowResult::Item(w.acc.output()))
86            .collect()
87    }
88}
89
90/// Window based on wall clock at time of processing
91#[derive(Clone)]
92pub struct ProcessingTimeWindow {
93    size: Duration,
94    slide: Duration,
95}
96
97impl ProcessingTimeWindow {
98    #[inline]
99    pub fn sliding(size: Duration, slide: Duration) -> Self {
100        assert!(!size.is_zero(), "window size must be > 0");
101        assert!(!slide.is_zero(), "window slide must be > 0");
102        Self { size, slide }
103    }
104
105    #[inline]
106    pub fn tumbling(size: Duration) -> Self {
107        assert!(!size.is_zero(), "window size must be > 0");
108        Self { size, slide: size }
109    }
110}
111
112impl<T: Data> WindowDescription<T> for ProcessingTimeWindow {
113    type Manager<A: WindowAccumulator<In = T>> = ProcessingTimeWindowManager<A>;
114
115    #[inline]
116    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
117        ProcessingTimeWindowManager {
118            init: accumulator,
119            size: self.size,
120            slide: self.slide,
121            ws: Default::default(),
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use crate::operator::window::aggr::Fold;
130
131    macro_rules! save_result {
132        ($ret:expr, $v:expr, $n:ident) => {{
133            let iter = $ret
134                .into_iter()
135                .inspect(|r| {
136                    if !r.item().is_empty() {
137                        $n += 1;
138                    }
139                })
140                .map(|r| r.unwrap_item())
141                .flatten();
142            $v.extend(iter);
143        }};
144    }
145
146    #[test]
147    #[ignore]
148    fn processing_time_window() {
149        let size = Duration::from_micros(100);
150        let window = ProcessingTimeWindow::tumbling(size);
151
152        let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
153        let mut manager = window.build(fold);
154
155        let start = Instant::now();
156        let mut received = Vec::new();
157        let mut n_windows = 0;
158        for i in 1..100 {
159            save_result!(manager.process(StreamElement::Item(i)), received, n_windows);
160        }
161        let expected_n = start.elapsed().as_micros() / size.as_micros() + 1;
162
163        save_result!(
164            manager.process(StreamElement::FlushAndRestart),
165            received,
166            n_windows
167        );
168
169        eprintln!("expected {expected_n} windows");
170
171        received.sort();
172        assert_eq!(n_windows, expected_n);
173        assert_eq!(received, (1..100).collect::<Vec<_>>())
174    }
175}