Skip to main content

dbsp/operator/
neighborhood.rs

1use crate::{
2    RootCircuit, Stream, ZWeight,
3    dynamic::{DynData, DynDataTyped, DynPair},
4    operator::dynamic::neighborhood::{
5        DynNeighborhoodDescr, NeighborhoodDescr, NeighborhoodFactories,
6    },
7    typed_batch::{BatchReader, DynBatchReader, DynOrdZSet, IndexedZSet, TypedBatch, TypedBox},
8    utils::Tup2,
9};
10
11pub type NeighborhoodDescrBox<K, V> =
12    TypedBox<NeighborhoodDescr<K, V>, DynNeighborhoodDescr<DynData, DynData>>;
13
14pub type NeighborhoodDescrStream<K, V> = Stream<RootCircuit, Option<NeighborhoodDescrBox<K, V>>>;
15
16/// See [`crate::operator::DynNeighborhood`].
17pub type Neighborhood<B> = TypedBatch<
18    Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
19    (),
20    ZWeight,
21    DynOrdZSet<
22        DynPair<
23            DynDataTyped<i64>,
24            DynPair<
25                <<B as BatchReader>::Inner as DynBatchReader>::Key,
26                <<B as BatchReader>::Inner as DynBatchReader>::Val,
27            >,
28        >,
29    >,
30>;
31
32pub type NeighborhoodStream<B> = Stream<
33    RootCircuit,
34    TypedBatch<
35        Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
36        (),
37        ZWeight,
38        DynOrdZSet<
39            DynPair<
40                DynDataTyped<i64>,
41                DynPair<
42                    <<B as BatchReader>::Inner as DynBatchReader>::Key,
43                    <<B as BatchReader>::Inner as DynBatchReader>::Val,
44                >,
45            >,
46        >,
47    >,
48>;
49
50impl<B> Stream<RootCircuit, B>
51where
52    B: IndexedZSet,
53{
54    /// Returns a small contiguous range of rows ([`Neighborhood`]) of the input
55    /// table.
56    ///
57    /// This operator helps to visualize the contents of the input table in a
58    /// UI.  The UI client may not have enough throughput/memory to store the
59    /// entire table, and will instead limit its state to a small range of
60    /// rows that fit on the screen.  We specify such a range, or
61    /// _neighborhood_, in terms of its center (or "anchor"), and the number
62    /// of rows preceding and following the anchor (see
63    /// [`NeighborhoodDescr`]).  The user may be interested in a static
64    /// snapshot of the neighborhood or in a changing view.  Both modes are
65    /// supported by this operator (see the `reset` argument).  The output of
66    /// the operator is a stream of [`Neighborhood`]s.
67    ///
68    /// NOTE: This operator assumes that the integral of the input stream does
69    /// not contain negative weights (which should normally be the case) and
70    /// may produce incorrect outputs otherwise.
71    ///
72    /// # Arguments
73    ///
74    /// * `self` - a stream of changes to an indexed Z-set.
75    ///
76    /// * `neighborhood_descr` - contains the neighborhood descriptor to
77    ///   evaluate at every clock tick.  Set to `None` to disable the operator
78    ///   (it will output an empty neighborhood).
79    ///
80    /// # Output
81    ///
82    /// Outputs a stream of changes to the neighborhood.
83    ///
84    /// The output neighborhood will contain rows with indexes between
85    /// `-descr.before` and `descr.after - 1`.  Row 0 is the anchor row, i.e.,
86    /// is the first row in the input stream greater than or equal to
87    /// `descr.anchor`.  If there is no such row (i.e., all rows in the input
88    /// stream are smaller than the anchor), then the neighborhood will only
89    /// contain negative indexes.
90    ///
91    /// The first index in the neighborhood may be greater
92    /// than `-descr.before` if the input stream doesn't contain enough rows
93    /// preceding the specified anchor.  The last index may be smaller than
94    /// `descr.after - 1` if the input stream doesn't contain `descr.after`
95    /// rows following the anchor point.
96    #[track_caller]
97    pub fn neighborhood(
98        &self,
99        neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>,
100    ) -> NeighborhoodStream<B> {
101        let factories: NeighborhoodFactories<B::Inner> =
102            NeighborhoodFactories::new::<B::Key, B::Val>();
103
104        let payload = unsafe { neighborhood_descr.transmute_payload() };
105        self.inner().dyn_neighborhood(&factories, &payload).typed()
106    }
107}