noir_compute/operator/window/descr/
transaction.rs

1use super::super::*;
2use crate::operator::{Data, StreamElement};
3
4/// Controls the status of a transaction window after the current element has been accumulated.
5#[derive(Default)]
6pub enum TransactionOp {
7    /// Keep the window open and continue accumulating elements in this window.
8    #[default]
9    Continue,
10    /// Output the result of the accumulator for this window.
11    Commit,
12    /// Output the result of the accumulator when the watermark is greater than a timestamp.
13    CommitAfter(Timestamp),
14    /// Discard the result of the accumulator for this window.
15    Discard,
16}
17
18#[derive(Clone)]
19pub struct TransactionWindowManager<A, F>
20where
21    A: WindowAccumulator,
22    F: Fn(&A::In) -> TransactionOp,
23{
24    init: A,
25    f: F,
26    w: Option<Slot<A>>,
27}
28
29#[derive(Clone)]
30struct Slot<A> {
31    acc: A,
32    close: Option<Timestamp>,
33}
34
35impl<A> Slot<A> {
36    #[inline]
37    fn new(acc: A) -> Self {
38        Self { acc, close: None }
39    }
40}
41
42impl<A: WindowAccumulator, F: Fn(&A::In) -> TransactionOp + Clone + Send + 'static> WindowManager
43    for TransactionWindowManager<A, F>
44where
45    A::In: Data,
46    A::Out: Data,
47{
48    type In = A::In;
49    type Out = A::Out;
50    type Output = Option<WindowResult<A::Out>>;
51
52    #[inline]
53    fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
54        macro_rules! return_current {
55            () => {
56                return Some(WindowResult::Item(self.w.take().unwrap().acc.output()))
57            };
58        }
59
60        match el {
61            StreamElement::Timestamped(item, _ts) => {
62                let slot = self.w.get_or_insert_with(|| Slot::new(self.init.clone()));
63
64                let command = (self.f)(&item);
65                slot.acc.process(item);
66
67                match command {
68                    TransactionOp::Commit => return_current!(),
69                    TransactionOp::CommitAfter(t) => slot.close = Some(t),
70                    TransactionOp::Discard => self.w = None,
71                    TransactionOp::Continue => {}
72                }
73            }
74            StreamElement::Watermark(ts) => {
75                if let Some(close) = self.w.as_ref().and_then(|w| w.close) {
76                    if close < ts {
77                        return_current!()
78                    }
79                }
80            }
81            StreamElement::Terminate | StreamElement::FlushAndRestart
82                if self.w.as_ref().and_then(|w| w.close).is_some() =>
83            {
84                return_current!()
85            }
86            StreamElement::Item(_) => panic!(
87                "Non timestamped streams are not currently supported with transaction windows!"
88            ),
89            _ => {}
90        }
91        None
92    }
93
94    fn recycle(&self) -> bool {
95        self.w.is_none()
96    }
97}
98
99/// Window that closes according to user supplied logic
100///
101/// + Windows are implicitly created when the first element for the partition is received.
102/// + Only one window per partition can be active at the same time.
103/// + The `logic` function determines when (and if) the window should be committed producing an output.
104/// + The `logic` function is called on each element before it is passed to the accumulator.
105/// + Returning [`TransactionOp::Continue`] will keep the window open and continue processing.
106/// + Returing [`TransactionOp::Commit`] will close the current window and generate an output.
107///   (The element triggering the commit will be included in the window)
108/// + Returning [`TransactionOp::Discard`] will close the window dropping the accumulator without producing
109///   an output
110/// + Returning [`TransactionOp::CommitAfter`] will register the window to be commited after a watermark
111///   with event time greater than the specified time has been received. The commit time can be overwritten
112///   by another message or cancelled by returning [`TransactionOp::Discard`].
113#[derive(Clone)]
114pub struct TransactionWindow<T, F: Fn(&T) -> TransactionOp> {
115    logic: F,
116    _t: PhantomData<T>,
117}
118
119impl<T, F: Fn(&T) -> TransactionOp> TransactionWindow<T, F> {
120    #[inline]
121    pub fn new(logic: F) -> Self {
122        Self {
123            logic,
124            _t: PhantomData,
125        }
126    }
127}
128
129impl<T: Data, F: Fn(&T) -> TransactionOp + Data> WindowDescription<T> for TransactionWindow<T, F> {
130    type Manager<A: WindowAccumulator<In = T>> = TransactionWindowManager<A, F>;
131
132    #[inline]
133    fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
134        TransactionWindowManager {
135            init: accumulator,
136            f: self.logic.clone(),
137            w: None,
138        }
139    }
140}
141
142// #[cfg(test)]
143// mod tests {
144//     use std::time::Duration;
145
146//     use super::*;
147//     use crate::operator::window::aggr::Fold;
148
149//     macro_rules! save_result {
150//         ($ret:expr, $v:expr) => {{
151//             let iter = $ret.into_iter().map(|r| r.unwrap_item());
152//             $v.extend(iter);
153//         }};
154//     }
155
156//     #[test]
157//     fn event_time_window() {
158//         let window = TransactionWindow::new(Duration::from_millis(10));
159
160//         let fold = Fold::new(Vec::new(), |v, el| v.push(el));
161//         let mut manager = window.build(fold);
162
163//         let mut received = Vec::new();
164//         for i in 0..100i64 {
165//             if i == 33 || i == 80 {
166//                 std::thread::sleep(Duration::from_millis(11))
167//             }
168//             save_result!(
169//                 manager.process(StreamElement::Timestamped(i, i / 10)),
170//                 received
171//             );
172//         }
173//         save_result!(manager.process(StreamElement::FlushAndRestart), received);
174
175//         received.sort();
176
177//         let expected: Vec<Vec<_>> =
178//             vec![(0..33).collect(), (33..80).collect(), (80..100).collect()];
179//         assert_eq!(received, expected)
180//     }
181// }