dbsp/operator/time_series/
window.rs

1use crate::{
2    dynamic::DynData, trace::BatchReaderFactories, typed_batch::TypedBox, DBData, OrdIndexedZSet,
3    RootCircuit, Stream, ZWeight,
4};
5
6impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
7where
8    K: DBData,
9    V: DBData,
10{
11    /// Extract a subset of values that fall within a moving window from a
12    /// stream of time-indexed values.
13    ///
14    /// This is a general form of the windowing operator that supports tumbling,
15    /// rolling windows, watermarks, etc., by relying on a user-supplied
16    /// function to compute window bounds at each clock cycle.
17    ///
18    /// This operator maintains the window **incrementally**, i.e., it outputs
19    /// changes to the contents of the window at each clock cycle.  The
20    /// complete contents of the window can be computed by integrating the
21    /// output stream.
22    ///
23    /// # Arguments
24    ///
25    /// * `self` - stream of indexed Z-sets (indexed by time).  The notion of
26    ///   time here is distinct from the DBSP logical time and can be modeled
27    ///   using any type that implements `Ord`.
28    ///
29    /// * `bounds` - stream that contains window bounds to use at each clock
30    ///   cycle.  At each clock cycle, it contains a `(start_time, end_time)`
31    ///   that describes a right-open time range `[start_time..end_time)`, where
32    ///   `end_time >= start_time`.  `start_time` must grow monotonically, i.e.,
33    ///   `start_time1` and `start_time2` read from the stream at two successive
34    ///   clock cycles must satisfy `start_time2 >= start_time1`.
35    ///
36    /// # Output
37    ///
38    /// The output stream contains **changes** to the contents of the window: at
39    /// every clock cycle it retracts values that belonged to the window at
40    /// the previous cycle, but no longer do, and inserts new values added
41    /// to the window.  The latter include new values in the input stream
42    /// that belong to the `[start_time..end_time)` range and values from
43    /// earlier inputs that fall within the new range, but not the previous
44    /// range.
45    ///
46    /// # Circuit
47    ///
48    /// ```text
49    ///                       bounds
50    ///
51    /// ───────────────────────────────────────────────────┐
52    ///                                                    │
53    ///        ┌────────────────────────────────────────┐  │
54    ///        │                                        │  │
55    ///        │                                        ▼  ▼
56    /// self   │     ┌────────────────┐             ┌───────────┐
57    /// ───────┴────►│ TraceAppend    ├──┐          │ Window    ├─────►
58    ///              └────────────────┘  │          └───────────┘
59    ///                ▲                 │                 ▲
60    ///                │    ┌──┐         │                 │
61    ///                └────┤z1│◄────────┘                 │
62    ///                     └┬─┘                           │
63    ///                      │            trace            │
64    ///                      └─────────────────────────────┘
65    /// ```
66    pub fn window(
67        &self,
68        inclusive: (bool, bool),
69        bounds: &Stream<RootCircuit, (TypedBox<K, DynData>, TypedBox<K, DynData>)>,
70    ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>> {
71        let input_factories = BatchReaderFactories::new::<K, V, ZWeight>();
72
73        let bounds = unsafe { bounds.transmute_payload::<(Box<DynData>, Box<DynData>)>() };
74        self.inner()
75            .dyn_window_mono(&input_factories, inclusive, &bounds)
76            .typed()
77    }
78}