dbsp/operator/time_series/window.rs
1use crate::{
2 dynamic::DynData, trace::BatchReaderFactories, typed_batch::TypedBox, DBData, OrdIndexedZSet,
3 RootCircuit, Stream, ZWeight,
4};
5
6impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
7where
8 K: DBData,
9 V: DBData,
10{
11 /// Extract a subset of values that fall within a moving window from a
12 /// stream of time-indexed values.
13 ///
14 /// This is a general form of the windowing operator that supports tumbling,
15 /// rolling windows, watermarks, etc., by relying on a user-supplied
16 /// function to compute window bounds at each clock cycle.
17 ///
18 /// This operator maintains the window **incrementally**, i.e., it outputs
19 /// changes to the contents of the window at each clock cycle. The
20 /// complete contents of the window can be computed by integrating the
21 /// output stream.
22 ///
23 /// # Arguments
24 ///
25 /// * `self` - stream of indexed Z-sets (indexed by time). The notion of
26 /// time here is distinct from the DBSP logical time and can be modeled
27 /// using any type that implements `Ord`.
28 ///
29 /// * `bounds` - stream that contains window bounds to use at each clock
30 /// cycle. At each clock cycle, it contains a `(start_time, end_time)`
31 /// that describes a right-open time range `[start_time..end_time)`, where
32 /// `end_time >= start_time`. `start_time` must grow monotonically, i.e.,
33 /// `start_time1` and `start_time2` read from the stream at two successive
34 /// clock cycles must satisfy `start_time2 >= start_time1`.
35 ///
36 /// # Output
37 ///
38 /// The output stream contains **changes** to the contents of the window: at
39 /// every clock cycle it retracts values that belonged to the window at
40 /// the previous cycle, but no longer do, and inserts new values added
41 /// to the window. The latter include new values in the input stream
42 /// that belong to the `[start_time..end_time)` range and values from
43 /// earlier inputs that fall within the new range, but not the previous
44 /// range.
45 ///
46 /// # Circuit
47 ///
48 /// ```text
49 /// bounds
50 ///
51 /// ───────────────────────────────────────────────────┐
52 /// │
53 /// ┌────────────────────────────────────────┐ │
54 /// │ │ │
55 /// │ ▼ ▼
56 /// self │ ┌────────────────┐ ┌───────────┐
57 /// ───────┴────►│ TraceAppend ├──┐ │ Window ├─────►
58 /// └────────────────┘ │ └───────────┘
59 /// ▲ │ ▲
60 /// │ ┌──┐ │ │
61 /// └────┤z1│◄────────┘ │
62 /// └┬─┘ │
63 /// │ trace │
64 /// └─────────────────────────────┘
65 /// ```
66 pub fn window(
67 &self,
68 inclusive: (bool, bool),
69 bounds: &Stream<RootCircuit, (TypedBox<K, DynData>, TypedBox<K, DynData>)>,
70 ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>> {
71 let input_factories = BatchReaderFactories::new::<K, V, ZWeight>();
72
73 let bounds = unsafe { bounds.transmute_payload::<(Box<DynData>, Box<DynData>)>() };
74 self.inner()
75 .dyn_window_mono(&input_factories, inclusive, &bounds)
76 .typed()
77 }
78}