noir_compute/operator/window/descr/
session.rs

1use std::time::{Duration, Instant};
2
3use super::super::*;
4use crate::operator::{Data, StreamElement};
5
6#[derive(Clone)]
7pub struct SessionWindowManager<A>
8where
9    A: WindowAccumulator,
10{
11    init: A,
12    gap: Duration,
13    w: Option<Slot<A>>,
14}
15
16#[derive(Clone)]
17struct Slot<A> {
18    acc: A,
19    last: Instant,
20}
21
22impl<A> Slot<A> {
23    #[inline]
24    fn new(acc: A, last: Instant) -> Self {
25        Self { acc, last }
26    }
27}
28
29impl<A: WindowAccumulator> WindowManager for SessionWindowManager<A>
30where
31    A::In: Data,
32    A::Out: Data,
33{
34    type In = A::In;
35    type Out = A::Out;
36    type Output = Option<WindowResult<A::Out>>;
37
38    #[inline]
39    fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
40        let ts = Instant::now();
41
42        let ret = match &self.w {
43            Some(slot) if ts - slot.last > self.gap => {
44                let output = self.w.take().unwrap().acc.output();
45                Some(WindowResult::Item(output))
46            }
47            _ => None,
48        };
49
50        match el {
51            StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
52                let slot = self
53                    .w
54                    .get_or_insert_with(|| Slot::new(self.init.clone(), ts));
55                slot.acc.process(item);
56                slot.last = ts;
57                ret
58            }
59            StreamElement::Terminate | StreamElement::FlushAndRestart => {
60                ret.or_else(|| self.w.take().map(|s| WindowResult::Item(s.acc.output())))
61            }
62            _ => ret,
63        }
64    }
65}
66
67/// Window that splits after if no element is received for a fixed wall clock duration
68#[derive(Clone)]
69pub struct SessionWindow {
70    gap: Duration,
71}
72
73impl SessionWindow {
74    #[inline]
75    pub fn new(gap_millis: Duration) -> Self {
76        assert!(!gap_millis.is_zero(), "window size must be > 0");
77        Self { gap: gap_millis }
78    }
79}
80
81impl<T: Data> WindowDescription<T> for SessionWindow {
82    type Manager<A: WindowAccumulator<In = T>> = SessionWindowManager<A>;
83
84    #[inline]
85    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
86        SessionWindowManager {
87            init: accumulator,
88            gap: self.gap,
89            w: Default::default(),
90        }
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::time::Duration;
97
98    use super::*;
99    use crate::operator::window::aggr::Fold;
100
101    macro_rules! save_result {
102        ($ret:expr, $v:expr) => {{
103            let iter = $ret.into_iter().map(|r| r.unwrap_item());
104            $v.extend(iter);
105        }};
106    }
107
108    #[test]
109    fn event_time_window() {
110        let window = SessionWindow::new(Duration::from_millis(10));
111
112        let fold = Fold::new(Vec::new(), |v, el| v.push(el));
113        let mut manager = window.build(fold);
114
115        let mut received = Vec::new();
116        for i in 0..100i64 {
117            if i == 33 || i == 80 {
118                std::thread::sleep(Duration::from_millis(11))
119            }
120            save_result!(
121                manager.process(StreamElement::Timestamped(i, i / 10)),
122                received
123            );
124        }
125        save_result!(manager.process(StreamElement::FlushAndRestart), received);
126
127        received.sort();
128
129        let expected: Vec<Vec<_>> =
130            vec![(0..33).collect(), (33..80).collect(), (80..100).collect()];
131        assert_eq!(received, expected)
132    }
133}