noir_compute/operator/window/descr/transaction.rs
1use super::super::*;
2use crate::operator::{Data, StreamElement};
3
4/// Controls the status of a transaction window after the current element has been accumulated.
5#[derive(Default)]
6pub enum TransactionOp {
7 /// Keep the window open and continue accumulating elements in this window.
8 #[default]
9 Continue,
10 /// Output the result of the accumulator for this window.
11 Commit,
12 /// Output the result of the accumulator when the watermark is greater than a timestamp.
13 CommitAfter(Timestamp),
14 /// Discard the result of the accumulator for this window.
15 Discard,
16}
17
18#[derive(Clone)]
19pub struct TransactionWindowManager<A, F>
20where
21 A: WindowAccumulator,
22 F: Fn(&A::In) -> TransactionOp,
23{
24 init: A,
25 f: F,
26 w: Option<Slot<A>>,
27}
28
29#[derive(Clone)]
30struct Slot<A> {
31 acc: A,
32 close: Option<Timestamp>,
33}
34
35impl<A> Slot<A> {
36 #[inline]
37 fn new(acc: A) -> Self {
38 Self { acc, close: None }
39 }
40}
41
42impl<A: WindowAccumulator, F: Fn(&A::In) -> TransactionOp + Clone + Send + 'static> WindowManager
43 for TransactionWindowManager<A, F>
44where
45 A::In: Data,
46 A::Out: Data,
47{
48 type In = A::In;
49 type Out = A::Out;
50 type Output = Option<WindowResult<A::Out>>;
51
52 #[inline]
53 fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
54 macro_rules! return_current {
55 () => {
56 return Some(WindowResult::Item(self.w.take().unwrap().acc.output()))
57 };
58 }
59
60 match el {
61 StreamElement::Timestamped(item, _ts) => {
62 let slot = self.w.get_or_insert_with(|| Slot::new(self.init.clone()));
63
64 let command = (self.f)(&item);
65 slot.acc.process(item);
66
67 match command {
68 TransactionOp::Commit => return_current!(),
69 TransactionOp::CommitAfter(t) => slot.close = Some(t),
70 TransactionOp::Discard => self.w = None,
71 TransactionOp::Continue => {}
72 }
73 }
74 StreamElement::Watermark(ts) => {
75 if let Some(close) = self.w.as_ref().and_then(|w| w.close) {
76 if close < ts {
77 return_current!()
78 }
79 }
80 }
81 StreamElement::Terminate | StreamElement::FlushAndRestart
82 if self.w.as_ref().and_then(|w| w.close).is_some() =>
83 {
84 return_current!()
85 }
86 StreamElement::Item(_) => panic!(
87 "Non timestamped streams are not currently supported with transaction windows!"
88 ),
89 _ => {}
90 }
91 None
92 }
93
94 fn recycle(&self) -> bool {
95 self.w.is_none()
96 }
97}
98
99/// Window that closes according to user supplied logic
100///
101/// + Windows are implicitly created when the first element for the partition is received.
102/// + Only one window per partition can be active at the same time.
103/// + The `logic` function determines when (and if) the window should be committed producing an output.
104/// + The `logic` function is called on each element before it is passed to the accumulator.
105/// + Returning [`TransactionOp::Continue`] will keep the window open and continue processing.
106/// + Returing [`TransactionOp::Commit`] will close the current window and generate an output.
107/// (The element triggering the commit will be included in the window)
108/// + Returning [`TransactionOp::Discard`] will close the window dropping the accumulator without producing
109/// an output
110/// + Returning [`TransactionOp::CommitAfter`] will register the window to be commited after a watermark
111/// with event time greater than the specified time has been received. The commit time can be overwritten
112/// by another message or cancelled by returning [`TransactionOp::Discard`].
113#[derive(Clone)]
114pub struct TransactionWindow<T, F: Fn(&T) -> TransactionOp> {
115 logic: F,
116 _t: PhantomData<T>,
117}
118
119impl<T, F: Fn(&T) -> TransactionOp> TransactionWindow<T, F> {
120 #[inline]
121 pub fn new(logic: F) -> Self {
122 Self {
123 logic,
124 _t: PhantomData,
125 }
126 }
127}
128
129impl<T: Data, F: Fn(&T) -> TransactionOp + Data> WindowDescription<T> for TransactionWindow<T, F> {
130 type Manager<A: WindowAccumulator<In = T>> = TransactionWindowManager<A, F>;
131
132 #[inline]
133 fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
134 TransactionWindowManager {
135 init: accumulator,
136 f: self.logic.clone(),
137 w: None,
138 }
139 }
140}
141
142// #[cfg(test)]
143// mod tests {
144// use std::time::Duration;
145
146// use super::*;
147// use crate::operator::window::aggr::Fold;
148
149// macro_rules! save_result {
150// ($ret:expr, $v:expr) => {{
151// let iter = $ret.into_iter().map(|r| r.unwrap_item());
152// $v.extend(iter);
153// }};
154// }
155
156// #[test]
157// fn event_time_window() {
158// let window = TransactionWindow::new(Duration::from_millis(10));
159
160// let fold = Fold::new(Vec::new(), |v, el| v.push(el));
161// let mut manager = window.build(fold);
162
163// let mut received = Vec::new();
164// for i in 0..100i64 {
165// if i == 33 || i == 80 {
166// std::thread::sleep(Duration::from_millis(11))
167// }
168// save_result!(
169// manager.process(StreamElement::Timestamped(i, i / 10)),
170// received
171// );
172// }
173// save_result!(manager.process(StreamElement::FlushAndRestart), received);
174
175// received.sort();
176
177// let expected: Vec<Vec<_>> =
178// vec![(0..33).collect(), (33..80).collect(), (80..100).collect()];
179// assert_eq!(received, expected)
180// }
181// }