1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use crate::{
RootCircuit, Stream, ZWeight,
dynamic::DynPair,
operator::dynamic::sample::StreamSampleUniqueKeyValsFactories,
trace::BatchReaderFactories,
typed_batch::{DynVecZSet, IndexedZSetReader, TypedBatch},
};
pub use crate::operator::dynamic::sample::{MAX_QUANTILES, MAX_SAMPLE_SIZE, default_quantiles};
use crate::utils::Tup2;
impl<B> Stream<RootCircuit, B>
where
B: IndexedZSetReader,
B::Inner: Clone,
{
/// Generates a uniform random sample of keys from `self`.
///
/// At every clock tick, computes a random sample of the input batch
/// using [`BatchReader::sample_keys`](`crate::trace::BatchReader::sample_keys`).
/// The `sample_size` stream
/// specifies the size of the sample to compute (use `0` when no sample
/// is needed at the current clock cycle to make sure the operator
/// doesn't waste CPU cycles).
///
/// Maximal supported sample size is [`MAX_SAMPLE_SIZE`]. If the operator
/// receives a larger `sample_size` value, it treats it as
/// `MAX_SAMPLE_SIZE`.
///
/// Outputs a Z-set containing randomly sampled keys. Each key is output
/// with weight `1` regardless of its weight or the number of associated
/// values in the input batch.
///
/// This is not an incremental operator. It samples the input
/// batch received at the current clock cycle and not the integral
/// of the input stream. Prefix the call to `stream_sample_keys()` with
/// `integrate_trace()` to sample the integral of the input.
///
/// WARNING: This operator (by definition) returns non-deterministic
/// outputs. As such it may not play well with most other DBSP operators
/// and must be used with care.
#[track_caller]
pub fn stream_sample_keys(
&self,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>> {
let factories = BatchReaderFactories::new::<B::Key, (), ZWeight>();
self.inner()
.dyn_stream_sample_keys(&factories, sample_size)
.typed()
}
/// Generates a uniform random sample of (key,value) pairs from `self`,
/// assuming that `self` contains exactly one value per key.
///
/// Equivalent to `self.map(|(k, v)| (k, v)).stream_sample_keys()`,
/// but is more efficient.
#[allow(clippy::type_complexity)]
#[track_caller]
pub fn stream_sample_unique_key_vals(
&self,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<
RootCircuit,
TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>,
> {
let factories = StreamSampleUniqueKeyValsFactories::new::<B::Key, B::Val>();
self.inner()
.dyn_stream_sample_unique_key_vals(&factories, sample_size)
.typed()
}
/// Generates a subset of keys that partition the set of all keys in `self`
/// into `num_quantiles + 1` approximately equal-size quantiles.
///
/// Internally, this operator uses the
/// [`stream_sample_keys`](`Self::stream_sample_keys`) operator to compute a
/// uniform random sample of size `num_quantiles ^ 2` and then picks
/// every `num_quantile`'s element of the sample.
///
/// Maximal supported `num_quantiles` value is [`MAX_QUANTILES`]. If the
/// operator receives a larger `num_quantiles` value, it treats it as
/// `MAX_QUANTILES`.
///
/// Outputs a Z-set containing `<=num_quantiles` keys. Each key is output
/// with weight `1` regardless of its weight or the number of associated
/// values in the input batch.
///
/// This is not an incremental operator. It samples the input
/// batch received at the current clock cycle and not the integral
/// of the input stream. Prefix the call to `stream_key_quantiles()` with
/// `integrate_trace()` to sample the integral of the input.
///
/// WARNING: This operator returns non-deterministic outputs, i.e.,
/// feeding the same input twice can produce different outputs. As such it
/// may not play well with most other DBSP operators and must be used with
/// care.
#[track_caller]
pub fn stream_key_quantiles(
&self,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>> {
let factories = BatchReaderFactories::new::<B::Key, (), ZWeight>();
self.inner()
.dyn_stream_key_quantiles(&factories, num_quantiles)
.typed()
}
/// Generates a subset of (key, value) pairs that partition the set of all
/// tuples in `self` `num_quantiles + 1` approximately equal-size quantiles,
/// assuming that `self` contains exactly one value per key.
///
/// Equivalent to `self.map(|(k, v)| (k,
/// v)).stream_unique_key_val_quantiles()`, but is more efficient.
#[allow(clippy::type_complexity)]
#[track_caller]
pub fn stream_unique_key_val_quantiles(
&self,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<
RootCircuit,
TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>,
> {
let factories = StreamSampleUniqueKeyValsFactories::new::<B::Key, B::Val>();
self.inner()
.dyn_stream_unique_key_val_quantiles(&factories, num_quantiles)
.typed()
}
}