dbsp/operator/neighborhood.rs
1use crate::{
2 RootCircuit, Stream, ZWeight,
3 dynamic::{DynData, DynDataTyped, DynPair},
4 operator::dynamic::neighborhood::{
5 DynNeighborhoodDescr, NeighborhoodDescr, NeighborhoodFactories,
6 },
7 typed_batch::{BatchReader, DynBatchReader, DynOrdZSet, IndexedZSet, TypedBatch, TypedBox},
8 utils::Tup2,
9};
10
11pub type NeighborhoodDescrBox<K, V> =
12 TypedBox<NeighborhoodDescr<K, V>, DynNeighborhoodDescr<DynData, DynData>>;
13
14pub type NeighborhoodDescrStream<K, V> = Stream<RootCircuit, Option<NeighborhoodDescrBox<K, V>>>;
15
16/// See [`crate::operator::DynNeighborhood`].
17pub type Neighborhood<B> = TypedBatch<
18 Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
19 (),
20 ZWeight,
21 DynOrdZSet<
22 DynPair<
23 DynDataTyped<i64>,
24 DynPair<
25 <<B as BatchReader>::Inner as DynBatchReader>::Key,
26 <<B as BatchReader>::Inner as DynBatchReader>::Val,
27 >,
28 >,
29 >,
30>;
31
32pub type NeighborhoodStream<B> = Stream<
33 RootCircuit,
34 TypedBatch<
35 Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
36 (),
37 ZWeight,
38 DynOrdZSet<
39 DynPair<
40 DynDataTyped<i64>,
41 DynPair<
42 <<B as BatchReader>::Inner as DynBatchReader>::Key,
43 <<B as BatchReader>::Inner as DynBatchReader>::Val,
44 >,
45 >,
46 >,
47 >,
48>;
49
50impl<B> Stream<RootCircuit, B>
51where
52 B: IndexedZSet,
53{
54 /// Returns a small contiguous range of rows ([`Neighborhood`]) of the input
55 /// table.
56 ///
57 /// This operator helps to visualize the contents of the input table in a
58 /// UI. The UI client may not have enough throughput/memory to store the
59 /// entire table, and will instead limit its state to a small range of
60 /// rows that fit on the screen. We specify such a range, or
61 /// _neighborhood_, in terms of its center (or "anchor"), and the number
62 /// of rows preceding and following the anchor (see
63 /// [`NeighborhoodDescr`]). The user may be interested in a static
64 /// snapshot of the neighborhood or in a changing view. Both modes are
65 /// supported by this operator (see the `reset` argument). The output of
66 /// the operator is a stream of [`Neighborhood`]s.
67 ///
68 /// NOTE: This operator assumes that the integral of the input stream does
69 /// not contain negative weights (which should normally be the case) and
70 /// may produce incorrect outputs otherwise.
71 ///
72 /// # Arguments
73 ///
74 /// * `self` - a stream of changes to an indexed Z-set.
75 ///
76 /// * `neighborhood_descr` - contains the neighborhood descriptor to
77 /// evaluate at every clock tick. Set to `None` to disable the operator
78 /// (it will output an empty neighborhood).
79 ///
80 /// # Output
81 ///
82 /// Outputs a stream of changes to the neighborhood.
83 ///
84 /// The output neighborhood will contain rows with indexes between
85 /// `-descr.before` and `descr.after - 1`. Row 0 is the anchor row, i.e.,
86 /// is the first row in the input stream greater than or equal to
87 /// `descr.anchor`. If there is no such row (i.e., all rows in the input
88 /// stream are smaller than the anchor), then the neighborhood will only
89 /// contain negative indexes.
90 ///
91 /// The first index in the neighborhood may be greater
92 /// than `-descr.before` if the input stream doesn't contain enough rows
93 /// preceding the specified anchor. The last index may be smaller than
94 /// `descr.after - 1` if the input stream doesn't contain `descr.after`
95 /// rows following the anchor point.
96 #[track_caller]
97 pub fn neighborhood(
98 &self,
99 neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>,
100 ) -> NeighborhoodStream<B> {
101 let factories: NeighborhoodFactories<B::Inner> =
102 NeighborhoodFactories::new::<B::Key, B::Val>();
103
104 let payload = unsafe { neighborhood_descr.transmute_payload() };
105 self.inner().dyn_neighborhood(&factories, &payload).typed()
106 }
107}