1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::{
RootCircuit, Stream, ZWeight,
dynamic::{DynData, DynDataTyped, DynPair},
operator::dynamic::neighborhood::{
DynNeighborhoodDescr, NeighborhoodDescr, NeighborhoodFactories,
},
typed_batch::{BatchReader, DynBatchReader, DynOrdZSet, IndexedZSet, TypedBatch, TypedBox},
utils::Tup2,
};
pub type NeighborhoodDescrBox<K, V> =
TypedBox<NeighborhoodDescr<K, V>, DynNeighborhoodDescr<DynData, DynData>>;
pub type NeighborhoodDescrStream<K, V> = Stream<RootCircuit, Option<NeighborhoodDescrBox<K, V>>>;
/// See [`crate::operator::DynNeighborhood`].
pub type Neighborhood<B> = TypedBatch<
Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
(),
ZWeight,
DynOrdZSet<
DynPair<
DynDataTyped<i64>,
DynPair<
<<B as BatchReader>::Inner as DynBatchReader>::Key,
<<B as BatchReader>::Inner as DynBatchReader>::Val,
>,
>,
>,
>;
pub type NeighborhoodStream<B> = Stream<
RootCircuit,
TypedBatch<
Tup2<i64, Tup2<<B as BatchReader>::Key, <B as BatchReader>::Val>>,
(),
ZWeight,
DynOrdZSet<
DynPair<
DynDataTyped<i64>,
DynPair<
<<B as BatchReader>::Inner as DynBatchReader>::Key,
<<B as BatchReader>::Inner as DynBatchReader>::Val,
>,
>,
>,
>,
>;
impl<B> Stream<RootCircuit, B>
where
B: IndexedZSet,
{
/// Returns a small contiguous range of rows ([`Neighborhood`]) of the input
/// table.
///
/// This operator helps to visualize the contents of the input table in a
/// UI. The UI client may not have enough throughput/memory to store the
/// entire table, and will instead limit its state to a small range of
/// rows that fit on the screen. We specify such a range, or
/// _neighborhood_, in terms of its center (or "anchor"), and the number
/// of rows preceding and following the anchor (see
/// [`NeighborhoodDescr`]). The user may be interested in a static
/// snapshot of the neighborhood or in a changing view. Both modes are
/// supported by this operator (see the `reset` argument). The output of
/// the operator is a stream of [`Neighborhood`]s.
///
/// NOTE: This operator assumes that the integral of the input stream does
/// not contain negative weights (which should normally be the case) and
/// may produce incorrect outputs otherwise.
///
/// # Arguments
///
/// * `self` - a stream of changes to an indexed Z-set.
///
/// * `neighborhood_descr` - contains the neighborhood descriptor to
/// evaluate at every clock tick. Set to `None` to disable the operator
/// (it will output an empty neighborhood).
///
/// # Output
///
/// Outputs a stream of changes to the neighborhood.
///
/// The output neighborhood will contain rows with indexes between
/// `-descr.before` and `descr.after - 1`. Row 0 is the anchor row, i.e.,
/// is the first row in the input stream greater than or equal to
/// `descr.anchor`. If there is no such row (i.e., all rows in the input
/// stream are smaller than the anchor), then the neighborhood will only
/// contain negative indexes.
///
/// The first index in the neighborhood may be greater
/// than `-descr.before` if the input stream doesn't contain enough rows
/// preceding the specified anchor. The last index may be smaller than
/// `descr.after - 1` if the input stream doesn't contain `descr.after`
/// rows following the anchor point.
#[track_caller]
pub fn neighborhood(
&self,
neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>,
) -> NeighborhoodStream<B> {
let factories: NeighborhoodFactories<B::Inner> =
NeighborhoodFactories::new::<B::Key, B::Val>();
let payload = unsafe { neighborhood_descr.transmute_payload() };
self.inner().dyn_neighborhood(&factories, &payload).typed()
}
}