noir_compute/operator/window/descr/
session.rs1use std::time::{Duration, Instant};
2
3use super::super::*;
4use crate::operator::{Data, StreamElement};
5
6#[derive(Clone)]
7pub struct SessionWindowManager<A>
8where
9 A: WindowAccumulator,
10{
11 init: A,
12 gap: Duration,
13 w: Option<Slot<A>>,
14}
15
16#[derive(Clone)]
17struct Slot<A> {
18 acc: A,
19 last: Instant,
20}
21
22impl<A> Slot<A> {
23 #[inline]
24 fn new(acc: A, last: Instant) -> Self {
25 Self { acc, last }
26 }
27}
28
29impl<A: WindowAccumulator> WindowManager for SessionWindowManager<A>
30where
31 A::In: Data,
32 A::Out: Data,
33{
34 type In = A::In;
35 type Out = A::Out;
36 type Output = Option<WindowResult<A::Out>>;
37
38 #[inline]
39 fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
40 let ts = Instant::now();
41
42 let ret = match &self.w {
43 Some(slot) if ts - slot.last > self.gap => {
44 let output = self.w.take().unwrap().acc.output();
45 Some(WindowResult::Item(output))
46 }
47 _ => None,
48 };
49
50 match el {
51 StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
52 let slot = self
53 .w
54 .get_or_insert_with(|| Slot::new(self.init.clone(), ts));
55 slot.acc.process(item);
56 slot.last = ts;
57 ret
58 }
59 StreamElement::Terminate | StreamElement::FlushAndRestart => {
60 ret.or_else(|| self.w.take().map(|s| WindowResult::Item(s.acc.output())))
61 }
62 _ => ret,
63 }
64 }
65}
66
67#[derive(Clone)]
69pub struct SessionWindow {
70 gap: Duration,
71}
72
73impl SessionWindow {
74 #[inline]
75 pub fn new(gap_millis: Duration) -> Self {
76 assert!(!gap_millis.is_zero(), "window size must be > 0");
77 Self { gap: gap_millis }
78 }
79}
80
81impl<T: Data> WindowDescription<T> for SessionWindow {
82 type Manager<A: WindowAccumulator<In = T>> = SessionWindowManager<A>;
83
84 #[inline]
85 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
86 SessionWindowManager {
87 init: accumulator,
88 gap: self.gap,
89 w: Default::default(),
90 }
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use std::time::Duration;
97
98 use super::*;
99 use crate::operator::window::aggr::Fold;
100
101 macro_rules! save_result {
102 ($ret:expr, $v:expr) => {{
103 let iter = $ret.into_iter().map(|r| r.unwrap_item());
104 $v.extend(iter);
105 }};
106 }
107
108 #[test]
109 fn event_time_window() {
110 let window = SessionWindow::new(Duration::from_millis(10));
111
112 let fold = Fold::new(Vec::new(), |v, el| v.push(el));
113 let mut manager = window.build(fold);
114
115 let mut received = Vec::new();
116 for i in 0..100i64 {
117 if i == 33 || i == 80 {
118 std::thread::sleep(Duration::from_millis(11))
119 }
120 save_result!(
121 manager.process(StreamElement::Timestamped(i, i / 10)),
122 received
123 );
124 }
125 save_result!(manager.process(StreamElement::FlushAndRestart), received);
126
127 received.sort();
128
129 let expected: Vec<Vec<_>> =
130 vec![(0..33).collect(), (33..80).collect(), (80..100).collect()];
131 assert_eq!(received, expected)
132 }
133}