pub struct Stream<C, D> { /* private fields */ }Expand description
A Stream<C, D> stores the output value of type D of an operator in a
circuit with type C.
Circuits are synchronous, meaning that each value is produced and consumed in the same clock cycle, so there can be at most one value in the stream at any time.
The value type D may be any type, although most Stream methods impose
additional requirements. Since a stream must yield one data item per clock
cycle, the rate at which data arrives is important to the choice of type.
If, for example, an otherwise scalar input stream might not have new data on
every cycle, an Option type could represent that, and to batch multiple
pieces of data in a single step, one might use Vec or another collection
type.
In practice, D is often a special collection type called an “indexed
Z-set”, represented as trait IndexedZSet. An indexed Z-set is
conceptually a set of (key, value, weight) tuples. Indexed Z-sets have a
specialization called a “non-indexed Z-set” (ZSet) that contains key
and weight only. Indexed and non-indexed Z-sets are both subtraits of a
higher-level Batch trait. Many operators on streams work only with an
indexed or non-indexed Z-set or another batch type as D.
§Data streams versus delta streams
A value in a Stream can represent data, or it can represent a delta (also
called an “update”). Most streams carry data types that could have either
meaning. In particular, a stream of indexed or non-indexed Z-sets can carry
either:
-
In a stream of data, the
weightindicates the multiplicity of a key. A negativeweighthas no natural interpretation and might indicate a bug. -
In a stream of deltas or updates, a positive
weightrepresents insertions and a negativeweightrepresents deletions.
There’s no way to distinguish a data stream from a delta stream from just
the type of the Stream since, as described above, a stream of Z-sets can
be either one. Some operators make sense for either kind of stream; for
example, adding streams of Z-sets with plus works
equally well in either case, or even for adding a delta to data. But other
operations make sense only for streams of data or only for streams of
deltas. In these cases, Stream often provides an operator for each type
of stream, and the programmer must choose the right one, since the types
themselves don’t help.
Stream refers to operators specifically for streams of data as
“nonincremental”. These operators, which have stream in their name,
e.g. stream_join, take streams of data as input and produce one as output.
They act as “lifted scalar operators” that don’t maintain state across
invocations and only act on their immediate inputs, that is, each output is
produced by independently applying the operator to the individual inputs
received in the current step:
┌─────────────┐
a ───►│ non- │
│ incremental ├───► c
b ───►│ operator │
└─────────────┘Stream refers to operators meant for streams of deltas as “incremental”.
These operators take streams of deltas as input and produces a stream of
deltas as output. Such operators could be implemented, inefficiently, in
terms of a nonincremental version by putting an integration operator before
each input and a differentiation operator after the output:
┌───┐ ┌─────────────┐
Δa ───►│ I ├─────►│ │
└───┘ │ non- │ ┌───┐
│ incremental ├───►│ D ├───► Δc
┌───┐ │ operator │ └───┘
Δb ───►│ I ├─────►│ │
└───┘ └─────────────┘§Operator naming convention
Stream uses _index and _generic suffixes and
stream_ prefix to declare variations of basic operations, e.g., map,
map_index, map_generic, map_index_generic, join, stream_join:
-
stream_prefix: This prefix indicates that the operator is “nonincremental”, that is, that it works with streams of data, rather than streams of deltas (see Data streams versus delta streams, above). -
_genericsuffix: Most operators can assemble their outputs into any collection type that implements theBatchtrait. In practice, we typically useOrdIndexedZSetfor indexed batches andOrdZSetfor non-indexed batches. Methods without the_genericsuffix return these concrete types, eliminating the need to type-annotate each invocation, while_genericmethods can be used to return arbitrary customBatchimplementations. -
_indexsuffix: Methods without the_indexsuffix return non-indexed batches.<method>_indexmethods combine the effects of<method>andindex, e.g.,stream.map_index(closure)is functionally equivalent, but more efficient than,stream.map(closure).index().
§Catalog of stream operators
Stream methods are the main way to perform
computations with streams. The number of available methods can be
overwhelming, so the subsections below categorize them into functionally
related groups.
§Input operators
Most streams are obtained via methods or traits that operate on Streams.
The input operators create the initial input streams for these other methods
to work with.
Circuit::add_source is the fundamental way to add an input stream.
Using it directly makes sense for cases like generating input using a
function (perhaps using Generator) or reading data from a file. More
commonly, RootCircuit offers the add_input_* functions as convenience
wrappers for add_source. Each one returns a tuple of:
-
A
Streamthat can be attached as input to operators in the circuit (within the constructor function passed toRootCircuit::buildonly). -
An input handle that can be used to add input to the stream from outside the circuit. In a typical scenario, the closure passed to build will return all of its input handles, which are used at runtime to feed new inputs to the circuit at each step. Different functions return different kinds of input handles.
Use RootCircuit::add_input_indexed_zset or
RootCircuit::add_input_zset to create an (indexed) Z-set input
stream. There’s also RootCircuit::add_input_set and
RootCircuit::add_input_map to simplify cases where a regular set or
map is easier to use than a Z-set. The latter functions maintain an extra
internal table tracking the contents of the set or map, so they’re a second
choice.
For special cases, there’s also RootCircuit::add_input_stream<T>. The
InputHandle that it returns needs to be told which workers to feed the
data to, which makes it harder to use. It might be useful for feeding
non-relational data to the circuit, such as the current physical time. DBSP
does not know how to automatically distribute such values across workers,
so the caller must decide whether to send the value to one specific worker
or to broadcast it to everyone.
It’s common to pass explicit type arguments to the functions that create input streams, e.g.:
circuit.add_input_indexed_zset::<KeyType, ValueType, WeightType>()§Output and debugging operators
There’s not much value in computations whose output can’t be seen in the
outside world. Use Stream::output to obtain an OutputHandle that
exports a stream’s data. The constructor function passed to
RootCircuit::build should return the OutputHandle (in addition to all
the input handles as described above). After each step, the client code
should take the new data from the OutputHandle, typically by calling
OutputHandle::consolidate.
Use Stream::inspect to apply a callback function to each data item in a
stream. The callback function has no return value and is executed only for
its side effects, such as printing the data item to stdout. The inspect
operator yields the same stream on its output.
It is not an operator, but Circuit::region can help with debugging by
grouping operators into a named collection.
§Record-by-record mapping operators
These operators map one kind of batch to another, allowing the client to pass in a function that looks at individual records in a Z-set or other batch. These functions apply to both streams of deltas and streams of data.
The following methods are available for streams of indexed and non-indexed Z-sets. Each of these takes a function that accepts a key (for non-indexed Z-sets) or a key-value pair (for indexed Z-sets):
-
Use
Stream::mapto output a non-indexed Z-set using an arbitrary mapping function, orStream::map_indexto map to an indexed Z-set. -
Use
Stream::filterto drop records that do not satisfy a predicate function. The output stream has the same type as the input. -
Use
Stream::flat_mapto output a Z-set that maps each input record to any number of records, orStream::flat_map_indexfor indexed Z-set output. These methods also work as afilter_mapequivalent.
§Value-by-value mapping operators
Sometimes it makes sense to map a stream’s whole data item instead of breaking Z-sets down into records. Unlike the record-by-record functions, these functions works with streams that carry a type other than a Z-set or batch. These functions apply to both streams of deltas and streams of data.
Use Stream::apply to apply a mapping function to each item in a given
stream. Stream::apply_named, Stream::apply_owned, and
Stream::apply_owned_named offer small variations.
Use Stream::apply2 or Stream::apply2_owned to apply a mapping
function to pairs of items drawn from two different input streams.
§Addition and subtraction operators
Arithmetic operators work on Z-sets (and batches) by operating on weights. For example, adding two Z-sets adds the weights of records with the same key-value pair. They also work on streams of arithmetic types.
Use Stream::neg to map a stream to its negation, that is, to negate the
weights for a Z-set.
Use Stream::plus to add two streams and Stream::sum to add an
arbitrary number of streams. Use Stream::minus to subtract streams.
There aren’t any multiplication or division operators, because there is no
clear interpretation of them for Z-sets. You can use Stream::apply and
Stream::apply2, as already described, to do arbitrary arithmetic on one
or two streams of arithmetic types.
§Stream type conversion operators
These operators convert among streams of deltas, streams of data, and streams of “upserts”. Client code can use them, but they’re more often useful for testing (for example, for checking that incremental operators are equivalent to non-incremental ones) or for building other operators.
Use Stream::integrate to sum up the values within an input stream. The
first output value is the first input value, the second output value is the
sum of the first two inputs, and so on. This effectively converts a stream
of deltas into a stream of data.
Stream::stream_fold generalizes integration. On each step, it calls a
function to fold the input value with an accumulator and outputs the
accumulator. The client provides the function and the initial value of the
accumulator.
Use Stream::differentiate to calculate differences between subsequent
values in an input stream. The first output is the first input value, the
second output is the second input value minus the first input value, and so
on. This effectively converts a stream of data into a stream of deltas.
You shouldn’t ordinarily need this operator, at least not for Z-sets,
because DBSP operators are fully incremental.
Use Stream::upsert to convert a stream of “upserts” into a stream of
deltas. The input stream carries “upserts”, or assignments of values to
keys such that a subsequent assignment to a key assigned earlier replaces
the earlier value. upsert turns these into a stream of deltas by
internally tracking upserts that have already been seen.
§Weighting and counting operators
Use Stream::dyn_weigh to multiply the weights in an indexed Z-set by a
user-provided function of the key and value. This is equally appropriate
for streams of data or deltas. This method outputs a non-indexed Z-set with
just the keys from the input, discarding the values, which also means that
weights will be added together in the case of equal input keys.
Stream provides methods to count the number of values per key in an
indexed Z-set:
-
To sum the weights for each value within a key, use
Stream::dyn_weighted_countfor streams of deltas orStream::dyn_stream_weighted_countfor streams of data. -
To count each value only once even for a weight greater than one, use
Stream::dyn_distinct_countfor streams of deltas orStream::dyn_stream_distinct_countfor streams of data.
The “distinct” operator on a Z-set maps positive weights to 1 and all other
weights to 0. Stream has two implementations:
-
Use
Stream::distinctto incrementally process a stream of deltas. If the output stream were to be integrated, it only contain records with weight 0 or 1. This operator internally integrates the stream of deltas, which means its memory consumption is proportional to the integrated data size. -
Use
Stream::stream_distinctto non-incrementally process a stream of data. It sets each record’s weight to 1 if it is positive and drops the others.
§Join on equal keys
A DBSP equi-join takes batches a and b as input, finds all pairs of a
record in a and a record in b with the same key, applies a given
function F to those records’ key and values, and outputs a Z-set with
F’s output.
DBSP implements two kinds of joins:
-
Joins of delta streams (“incremental” joins) for indexed Z-sets with the same key type. Use
Stream::joinfor non-indexed Z-set output, orStream::join_indexfor indexed Z-set output. -
Joins of data streams (“nonincremental” joins), which work with any indexed batches. Use
Stream::stream_join, which outputs a non-indexed Z-set.stream_joinalso works for joining a stream of deltas with an invariant stream of data where the latter is used as a lookup table.If the output of the join function grows monotonically as
(k, v1, v2)tuples are fed to it in lexicographic order, thenStream::monotonic_stream_joinis more efficient. One such monotonic function is a join function that returns(k, v1, v2)itself.
One way to implement a Cartesian product is to map unindexed Z-set inputs
into indexed Z-sets with a unit key type, e.g. input.index_with(|k| ((), k)), and then use join or stream_join, as appropriate.
§Other kinds of joins
Use Stream::antijoin for antijoins of delta streams. It takes indexed
Z-set a and Z-set b with the same key type and yields the subset of a
whose keys do not appear in b. b may be indexed or non-indexed and its
value type does not matter.
Use Stream::dyn_semijoin_stream for semi-joins of data streams. It
takes a batch a and non-indexed batch b with the same key type as a.
It outputs a non-indexed Z-set of key-value tuples that contains all the
pairs from a for which a key appears in b.
Use Stream::outer_join or Stream::outer_join_default for outer joins
of delta streams. The former takes three functions, one for each of the
cases (common keys, left key only, right key only), and the latter
simplifies it by taking only a function for common keys and passing in the
default for the missing value.
DBSP implements “range join” of data streams, which joins keys in a
against ranges of keys in b. Stream::dyn_stream_join_range implements
range join with non-indexed Z-set output,
Stream::dyn_stream_join_range_index with indexed output.
§Aggregation
Aggregation applies a function (the “aggregation function”) to all of the values for a given key in an input stream, and outputs an indexed Z-set with the same keys as the input and the function’s output as values. The output of aggregation usually has fewer records than its input, because it outputs only one record per input key, regardless of the number of key-value pairs with that key.
DBSP implements two kinds of aggregation:
-
Stream::dyn_aggregateaggregates delta streams. It takes an aggregation function as anAggregator, e.g.Min,Max,Fold, or one written by the client.Stream::dyn_aggregate_linearis cheaper for linear aggregation functions. It’s also a little easier to use with a custom aggregation function, because it only takes a function rather than anAggregatorobject.Stream::dyn_averagecalculates the average over the values for each key. -
Stream::dyn_stream_aggregateaggregates data streams. Each batch from the input is separately aggregated and written to the output stream.Stream::dyn_stream_aggregate_linearapplies a linear aggregation function to a data stream.
These aggregation functions all partition the aggregation by key, like GROUP
BY in SQL. To aggregate all records in a non-indexed Z-set, map to an
indexed Z-set with a unit key () before aggregating, then map again to
remove the index if necessary, e.g.:
let max_auction_count = auction_counts
.map_index(|(_auction, count)| ((), *count))
.aggregate(Max)
.map(|((), max_count)| *max_count);§Rolling aggregates
DBSP supports rolling aggregation of time series data over a
client-specified “rolling window” range. For this purpose, Rust unsigned
integer types model times, larger integers corresponding to later times.
The unit of time in use is relevant only for specifying the width of the
aggregation window, with RelRange.
The DBSP logical time concept is unrelated to times used in rolling aggregation and other time-series operators. The former is used to establish the ordering in which updates are consumed by DBSP, while the latter model physical times when the corresponding events were generated, observed, or processed. In particular, the ordering of physical and logical timestamps doesn’t have to match. In other words, DBSP can process events out-of-order.
Rolling aggregation takes place within a “partition”, which is any
convenient division of the data. It might correspond to a tenant ID, for
example, if each tenant’s data is to be separately aggregated. To represent
partitioning, rolling aggregation introduces the
OrdPartitionedIndexedZSet
type, which is an IndexedZSet with an arbitrary key type that specifies
the partition (it may be () if all data is to be within a single
partition) and a value type of the form (TS, V) where TS is the type
used for time and V is the client’s value type.
Rolling aggregation does not reduce the size of data. It outputs one record for each input record.
DBSP has two kinds of rolling aggregation functions that differ based on their tolerance for updating aggregation results when new data arrives for an old moment in time:
-
If the application must tolerate data arriving entirely out-of-order, use
Stream::partitioned_rolling_aggregate. It operates on aPartitionedIndexedZSetand takes anAggregatorand aRelRangethat specifies the span of the window. It returns anotherPartitionedIndexedZSetwith the results. This operator must buffer old data indefinitely since old output is always subject to revision.Stream::partitioned_rolling_aggregate_linearis cheaper for linear aggregation functions.Stream::partitioned_rolling_averagecalculates the rolling average over a partition. -
If the application can discard data that arrives too out-of-order, use
Stream::partitioned_rolling_aggregate_with_waterline, which can be more memory-efficient. This form of rolling aggregation requires a “waterline” stream, which is a stream of times (scalars, not batches or Z sets) that reports the earliest time that can be updated. UseStream::waterline_monotonicto conveniently produce the waterline stream.Stream::partitioned_rolling_aggregate_with_waterlineoperates on anIndexedZSetand, in addition to the aggregrator, range, and waterline stream, it takes a function to map a record to a partition. It discards input before the waterline, partitions it, aggregates it, and returns the result as aPartitionedIndexedZSet.
§Windowing
Use Stream::dyn_window to extract a stream of deltas to windows from a
stream of deltas. This can be useful for windowing outside the context of
rolling aggregation.
Implementations§
Source§impl<C: Clone, B: DynBatchReader> Stream<C, B>
impl<C: Clone, B: DynBatchReader> Stream<C, B>
pub fn typed<TB>(&self) -> Stream<C, TB>where
TB: BatchReader<Inner = B>,
Source§impl<C: Circuit, T, D: ?Sized> Stream<C, TypedBox<T, D>>
impl<C: Circuit, T, D: ?Sized> Stream<C, TypedBox<T, D>>
pub fn inner_data(&self) -> Stream<C, Box<D>>
Source§impl<C: Circuit, T: DBData, D: DataTrait + ?Sized> Stream<C, TypedBox<T, D>>
impl<C: Circuit, T: DBData, D: DataTrait + ?Sized> Stream<C, TypedBox<T, D>>
pub fn inner_typed(&self) -> Stream<C, T>
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn local_node_id(&self) -> NodeId
pub fn local_node_id(&self) -> NodeId
Returns local node id of the operator or subcircuit that writes to this stream.
If the stream originates in a subcircuit, returns the id of the subcircuit node.
Sourcepub fn origin_node_id(&self) -> &GlobalNodeId
pub fn origin_node_id(&self) -> &GlobalNodeId
Returns global id of the operator that writes to this stream.
If the stream originates in a subcircuit, returns id of the operator inside the subcircuit (or one of its subcircuits) that produces the contents of the stream.
pub fn stream_id(&self) -> StreamId
pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool
Source§impl<C, D> Stream<C, D>where
C: Circuit,
impl<C, D> Stream<C, D>where
C: Circuit,
Sourcepub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self
pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self
Create a stream out of an existing RefStreamValue with node_id as the source.
pub fn value(&self) -> RefStreamValue<D>
Sourcepub fn export(&self) -> Stream<C::Parent, D>
pub fn export(&self) -> Stream<C::Parent, D>
Export stream to the parent circuit.
Creates a stream in the parent circuit that contains the last value in
self when the child circuit terminates.
This method currently only works for streams connected to a feedback
Z1 operator and will panic for other streams.
Sourcepub fn set_label(&self, key: &str, val: &str) -> Self
pub fn set_label(&self, key: &str, val: &str) -> Self
Call set_label on the node that produces this stream.
Sourcepub fn get_label(&self, key: &str) -> Option<String>
pub fn get_label(&self, key: &str) -> Option<String>
Call get_label on the node that produces this stream.
Sourcepub fn set_persistent_id(&self, name: Option<&str>) -> Self
pub fn set_persistent_id(&self, name: Option<&str>) -> Self
Set persistent id for the operator that produces this stream.
Sourcepub fn get_persistent_id(&self) -> Option<String>
pub fn get_persistent_id(&self) -> Option<String>
Get persistent id for the operator that produces this stream.
Source§impl<C, T> Stream<C, T>where
C: Circuit,
impl<C, T> Stream<C, T>where
C: Circuit,
Sourcepub fn set_persistent_mir_id(&self, id: &str)
pub fn set_persistent_mir_id(&self, id: &str)
Sets id as both the MIR node id and persistent id of the
source node of the stream.
Sourcepub fn get_mir_node_id(&self) -> Option<String>
pub fn get_mir_node_id(&self) -> Option<String>
Get the MIR node id label if any.
Source§impl<C, T1> Stream<C, T1>
impl<C, T1> Stream<C, T1>
Sourcepub fn apply<F, T2>(&self, func: F) -> Stream<C, T2>
pub fn apply<F, T2>(&self, func: F) -> Stream<C, T2>
Returns a stream that contains func(&x) for each x in self.
func cannot mutate captured state.
The operator will have a generic name for debugging and profiling
purposes. Use Stream::apply_named, instead, to give it a
specific name.
Sourcepub fn apply_mut<F, T2>(&self, func: F) -> Stream<C, T2>
pub fn apply_mut<F, T2>(&self, func: F) -> Stream<C, T2>
Returns a stream that contains func(&x) for each x in self.
func can access and mutate captured state.
The operator will have a generic name for debugging and profiling
purposes. Use Stream::apply_mut_named, instead, to give it a
specific name.
Sourcepub fn apply_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
pub fn apply_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
Returns a stream that contains func(&x) for each x in self, giving
the operator the given name for debugging and profiling purposes.
func cannot mutate captured state.
Sourcepub fn apply_mut_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
pub fn apply_mut_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
Returns a stream that contains func(&x) for each x in self, giving
the operator the given name for debugging and profiling purposes.
func can access and mutate captured state.
Sourcepub fn apply_owned<F, T2>(&self, func: F) -> Stream<C, T2>
pub fn apply_owned<F, T2>(&self, func: F) -> Stream<C, T2>
Returns a stream that contains func(x) for each x in self.
func cannot mutate captured state.
The operator will have a generic name for debugging and profiling
purposes. Use Stream::apply_owned_named, instead, to give it a
specific name.
Sourcepub fn apply_mut_owned<F, T2>(&self, func: F) -> Stream<C, T2>
pub fn apply_mut_owned<F, T2>(&self, func: F) -> Stream<C, T2>
Returns a stream that contains func(x) for each x in self.
func can access and mutate captured state.
The operator will have a generic name for debugging and profiling
purposes. Use Stream::apply_mut_owned_named, instead, to give it a
specific name.
Sourcepub fn apply_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
pub fn apply_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
Returns a stream that contains func(x) for each x in self, giving
the operator the given name for debugging and profiling purposes.
func cannot mutate captured state.
Sourcepub fn apply_mut_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
pub fn apply_mut_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
Returns a stream that contains func(x) for each x in self, giving
the operator the given name for debugging and profiling purposes.
func can access and mutate captured state.
Sourcepub fn apply_core<N, T2, O, B, F>(
&self,
name: N,
owned: O,
borrowed: B,
fixpoint: F,
) -> Stream<C, T2>
pub fn apply_core<N, T2, O, B, F>( &self, name: N, owned: O, borrowed: B, fixpoint: F, ) -> Stream<C, T2>
Apply the ApplyCore operator to self with a custom name
Source§impl<C, T1> Stream<C, T1>
impl<C, T1> Stream<C, T1>
Source§impl<C, T1> Stream<C, T1>
impl<C, T1> Stream<C, T1>
Sourcepub fn apply3<F, T2, T3, T4>(
&self,
other1: &Stream<C, T2>,
other2: &Stream<C, T3>,
func: F,
) -> Stream<C, T4>
pub fn apply3<F, T2, T3, T4>( &self, other1: &Stream<C, T2>, other2: &Stream<C, T3>, func: F, ) -> Stream<C, T4>
Apply a user-provided ternary function to inputs from three streams at each timestamp.
Sourcepub fn apply3_with_preference<F, T2, T3, T4>(
&self,
self_preference: OwnershipPreference,
other1: (&Stream<C, T2>, OwnershipPreference),
other2: (&Stream<C, T3>, OwnershipPreference),
func: F,
) -> Stream<C, T4>
pub fn apply3_with_preference<F, T2, T3, T4>( &self, self_preference: OwnershipPreference, other1: (&Stream<C, T2>, OwnershipPreference), other2: (&Stream<C, T3>, OwnershipPreference), func: F, ) -> Stream<C, T4>
Apply a user-provided ternary function to inputs at each timestamp.
Allows the caller to specify the ownership preference for each input stream.
Source§impl<C, IB> Stream<C, IB>
impl<C, IB> Stream<C, IB>
Sourcepub fn shard(&self) -> Stream<C, IB>
pub fn shard(&self) -> Stream<C, IB>
Shard batches across multiple worker threads based on keys.
§Theory
We parallelize processing across N worker threads by creating a
replica of the same circuit per thread and sharding data across
replicas. To ensure correctness (i.e., that the sum of outputs
produced by individual workers is equal to the output produced
by processing the entire dataset by one worker), sharding must satisfy
certain requirements determined by each operator. In particular,
for distinct, and aggregate all tuples that share the same key
must be processed by the same worker. For join, tuples from both
input streams with the same key must be processed by the same worker.
Other operators, e.g., filter and flat_map, impose no restrictions
on the sharding scheme: as long as each tuple in a batch is
processed by some worker, the correct result will be produced. This
is true for all linear operators.
The shard operator shards input batches based on the hash of the key,
making sure that tuples with the same key always end up at the same
worker. More precisely, the operator re-shards its input by
partitioning batches in the input stream of each worker based on the
hash of the key, distributing resulting fragments among peers
and re-assembling fragments at each peer:
┌──────────────────┐
worker1 │ │
───────►├─────┬───────────►├──────►
│ │ │
───────►├─────┴───────────►├──────►
worker2 │ │
└──────────────────┘§Usage
Most users do not need to invoke shard directly (and doing so is
likely to lead to incorrect results unless you know exactly what you
are doing). Instead, each operator re-shards its inputs as
necessary, e.g., join applies shard to both of its
input streams, while filter consumes its input directly without
re-sharding.
§Performance considerations
In the current implementation, the shard operator introduces a
synchronization barrier across all workers: its output at any worker
is only produced once input batches have been collected from all
workers. This limits the scalability since a slow worker (e.g., running
on a busy CPU core or sharing the core with other workers) or uneven
sharding can slow down the whole system and reduce gains from
parallelization.
Sourcepub fn shard_workers(&self, workers: Range<usize>) -> Stream<C, IB>
pub fn shard_workers(&self, workers: Range<usize>) -> Stream<C, IB>
Shard batch across just the specified range of workers.
If workers contains just one worker, then Stream::gather is more
efficient.
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn accumulate(&self) -> Stream<C, Option<Spine<B>>>
pub fn accumulate(&self) -> Stream<C, Option<Spine<B>>>
Accumulate changes within a clock cycle in a spine.
Outputs a spine containing all input changes accumulated since the previous flush once per clock cycle,
during flush, and None otherwise.
This operator is a key part of efficient processing of long transactions. It is used in conjunction with stateful operators like join, aggregate, distinct, etc., to supply all inputs comprising a transaction at once, avoiding computing mutually canceling changes.
Using Spine to accumulate changes ensures that during a long transaction changes
are pushed to storage and get compacted by background workers.
Sourcepub fn accumulate_with_enable_count(
&self,
) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>)
pub fn accumulate_with_enable_count( &self, ) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>)
Like Self::accumulate, but also returns a reference to the enable count of the accumulator.
Used to instantiate accumulators for output connectors. See Accumulator::enable_count documentation.
pub fn accumulate_apply2<B2, F, T>(
&self,
other: &Stream<C, B2>,
func: F,
) -> Stream<C, Option<T>>where
B2: Batch,
F: Fn(TypedBatch<B::Key, B::Val, B::R, SpineSnapshot<B::Inner>>, TypedBatch<B2::Key, B2::Val, B2::R, SpineSnapshot<B2::Inner>>) -> T + 'static,
T: Clone + 'static,
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn condition<F>(&self, condition_func: F) -> Condition<C>
pub fn condition<F>(&self, condition_func: F) -> Condition<C>
Attach a condition to a stream.
A Condition is a condition on the value in the stream
checked on each clock cycle, that can be used to terminate
the execution of the subcircuit (see
ChildCircuit::iterate_with_condition and
ChildCircuit::iterate_with_conditions).
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet<DynK = DynData>,
Z::InnerBatch: Send,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet<DynK = DynData>,
Z::InnerBatch: Send,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
Sourcepub fn weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
pub fn weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
Incrementally sums the weights for each key self into an indexed Z-set
that maps from the original keys to the weights. Both the input and
output are streams of updates.
Sourcepub fn weighted_count_generic<O>(&self) -> Stream<C, O>
pub fn weighted_count_generic<O>(&self) -> Stream<C, O>
Like Self::dyn_weighted_count, but can return any batch type.
Sourcepub fn distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
pub fn distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
Incrementally, for each key in self, counts the number of unique
values having positive weights, and outputs it as an indexed Z-set
that maps from the original keys to the unique value counts. Both
the input and output are streams of updates.
Sourcepub fn distinct_count_generic<O>(&self) -> Stream<C, O>
pub fn distinct_count_generic<O>(&self) -> Stream<C, O>
Like Self::dyn_distinct_count, but can return any batch type.
Sourcepub fn stream_weighted_count(
&self,
) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
pub fn stream_weighted_count( &self, ) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
Non-incrementally sums the weights for each key self into an indexed
Z-set that maps from the original keys to the weights. Both the
input and output are streams of data (not updates).
Sourcepub fn stream_weighted_count_generic<O>(&self) -> Stream<C, O>
pub fn stream_weighted_count_generic<O>(&self) -> Stream<C, O>
Like Self::dyn_stream_weighted_count, but can return any batch type.
Sourcepub fn stream_distinct_count(
&self,
) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
pub fn stream_distinct_count( &self, ) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>
Incrementally, for each key in self, counts the number of unique
values having positive weights, and outputs it as an indexed Z-set
that maps from the original keys to the unique value counts. Both
the input and output are streams of data (not updates).
Sourcepub fn stream_distinct_count_generic<O>(&self) -> Stream<C, O>
pub fn stream_distinct_count_generic<O>(&self) -> Stream<C, O>
Like Self::dyn_distinct_count, but can return any batch type.
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn delta0<CC>(&self, subcircuit: &CC) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
pub fn delta0<CC>(&self, subcircuit: &CC) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
Import self from the parent circuit to subcircuit via the Delta0
operator.
See Delta0 operator documentation.
Sourcepub fn delta0_with_preference<CC>(
&self,
subcircuit: &CC,
input_preference: OwnershipPreference,
) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
pub fn delta0_with_preference<CC>(
&self,
subcircuit: &CC,
input_preference: OwnershipPreference,
) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
Like Self::delta0, but overrides the ownership
preference on the input stream with input_preference.
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn differentiate(&self) -> Stream<C, D>
pub fn differentiate(&self) -> Stream<C, D>
Stream differentiation.
Computes the difference between current and previous value
of self: differentiate(a) = a - z^-1(a).
The first output is the first input value, the second output is the
second input value minus the first input value, and so on.
You shouldn’t ordinarily need this operator, at least not for streams of Z-sets, because most DBSP operators are fully incremental.
Sourcepub fn differentiate_nested(&self) -> Stream<C, D>
pub fn differentiate_nested(&self) -> Stream<C, D>
Nested stream differentiation.
Source§impl<C, T, K, V, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>where
C: Clone + 'static,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
T: Timestamp,
B: DynIndexedZSet,
TypedBatch<K, V, ZWeight, B>: Checkpoint + SizeOf + NumEntries + Clone + 'static,
impl<C, T, K, V, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>where
C: Clone + 'static,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
T: Timestamp,
B: DynIndexedZSet,
TypedBatch<K, V, ZWeight, B>: Checkpoint + SizeOf + NumEntries + Clone + 'static,
Sourcepub fn accumulate_differentiate(
&self,
) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>
pub fn accumulate_differentiate( &self, ) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>
Accumulates changes within a clock cycle and differentiates accumulated changes across clock cycles.
Source§impl<C, D> Stream<C, D>where
C: Circuit + 'static,
D: Checkpoint + SizeOf + NumEntries + Neg<Output = D> + Clone + AddByRef + AddAssignByRef + NegByRef + Eq + 'static,
impl<C, D> Stream<C, D>where
C: Circuit + 'static,
D: Checkpoint + SizeOf + NumEntries + Neg<Output = D> + Clone + AddByRef + AddAssignByRef + NegByRef + Eq + 'static,
pub fn differentiate_with_initial_value(&self, initial: D) -> Stream<C, D>
Source§impl<C, D> Stream<C, D>where
C: Circuit,
D: Checkpoint + AddByRef + AddAssignByRef + Clone + Eq + HasZero + SizeOf + NumEntries + 'static,
impl<C, D> Stream<C, D>where
C: Circuit,
D: Checkpoint + AddByRef + AddAssignByRef + Clone + Eq + HasZero + SizeOf + NumEntries + 'static,
Sourcepub fn integrate(&self) -> Stream<C, D>
pub fn integrate(&self) -> Stream<C, D>
Integrate the input stream.
Computes the sum of values in the input stream. The first output value is the first input value, the second output value is the sum of the first two inputs, and so on.
§Examples
let circuit = RootCircuit::build(move |circuit| {
// Generate a stream of 1's.
let stream = circuit.add_source(Generator::new(|| 1));
stream.inspect(move |n| eprintln!("{n}"));
// Integrate the stream.
let integral = stream.integrate();
integral.inspect(move |n| eprintln!("{n}"));
let mut counter1 = 0;
eprintln!("{counter1}");
integral.inspect(move |n| {
counter1 += 1;
assert_eq!(*n, counter1)
});
let mut counter2 = 0;
integral.delay().inspect(move |n| {
assert_eq!(*n, counter2);
counter2 += 1;
});
Ok(())
})
.unwrap()
.0;
for _ in 0..5 {
circuit.transaction().unwrap();
}The above example generates the following input/output mapping:
input: 1, 1, 1, 1, 1, ...
output: 1, 2, 3, 4, 5, ...Sourcepub fn integrate_nested(&self) -> Stream<C, D>
pub fn integrate_nested(&self) -> Stream<C, D>
Integrate stream of streams.
Computes the sum of nested streams, i.e., rather than integrating values
in each nested stream, this function sums up entire input streams
across all parent timestamps, where the sum of streams is defined as
a stream of point-wise sums of their elements: integral[i,j] = sum(input[k,j]), k<=i, where stream[i,j] is the value of stream
at time [i,j], i is the parent timestamp, and j is the child
timestamp.
Yields the sum element-by-element as the input stream is fed to the integral.
§Examples
Input stream (one row per parent timestamps):
1 2 3 4
1 1 1 1 1
2 2 2 0 0Integral:
1 2 3 4
2 3 4 5 1
4 5 6 5 1Source§impl<C, T, K, V, R, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
impl<C, T, K, V, R, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
Sourcepub fn accumulate_integrate(
&self,
) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
pub fn accumulate_integrate( &self, ) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
Integrate the input stream, updating the output once per clock tick.
Source§impl<T> Stream<RootCircuit, T>
impl<T> Stream<RootCircuit, T>
Sourcepub fn output(&self) -> OutputHandle<T>
pub fn output(&self) -> OutputHandle<T>
Create an output handle that makes the contents of self available
outside the circuit.
This API makes the result of the computation performed by the circuit
available to the outside world. At each clock cycle, the contents
of the stream is buffered inside the handle and can be read using
the OutputHandle API.
pub fn output_persistent(&self, persistent_id: Option<&str>) -> OutputHandle<T>
pub fn output_persistent_with_gid( &self, persistent_id: Option<&str>, ) -> (OutputHandle<T>, GlobalNodeId)
Sourcepub fn output_guarded(
&self,
guard: &Stream<RootCircuit, bool>,
) -> OutputHandle<T>
pub fn output_guarded( &self, guard: &Stream<RootCircuit, bool>, ) -> OutputHandle<T>
Create an output handle that makes the contents of self available
outside the circuit on demand.
This operator is similar to output, but it only
produces the output conditionally, when the value in the guard stream
is true. When guard is false, the output mailbox remains empty at
the end of the clock cycle, and OutputHandle::take_from_worker will
return None. This operator can be used to output a large collection,
such as an integral of a stream, on demand.
Source§impl<B> Stream<RootCircuit, B>
impl<B> Stream<RootCircuit, B>
Sourcepub fn accumulate_output(&self) -> OutputHandle<SpineSnapshot<B>>
pub fn accumulate_output(&self) -> OutputHandle<SpineSnapshot<B>>
Output operator that produces a single accumulated output per clock cycle.
pub fn accumulate_output_persistent( &self, persistent_id: Option<&str>, ) -> OutputHandle<SpineSnapshot<B>>
Sourcepub fn accumulate_output_persistent_with_gid(
&self,
persistent_id: Option<&str>,
) -> (OutputHandle<SpineSnapshot<B>>, Arc<AtomicUsize>, GlobalNodeId)
pub fn accumulate_output_persistent_with_gid( &self, persistent_id: Option<&str>, ) -> (OutputHandle<SpineSnapshot<B>>, Arc<AtomicUsize>, GlobalNodeId)
Returns:
- The output handle.
- The enable count of the accumulator. Can be used to enable/disable the accumulator.
- The global node ID of the output operator.
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn plus(&self, other: &Stream<C, D>) -> Stream<C, D>
pub fn plus(&self, other: &Stream<C, D>) -> Stream<C, D>
Apply the Plus operator to self and other.
Adding two indexed Z-sets adds the weights of matching key-value pairs.
The stream type’s addition operation must be commutative.
§Examples
let circuit = RootCircuit::build(move |circuit| {
// Stream of non-negative values: 0, 1, 2, ...
let mut n = 0;
let source1 = circuit.add_source(Generator::new(move || {
let res = n;
n += 1;
res
}));
// Stream of non-positive values: 0, -1, -2, ...
let mut n = 0;
let source2 = circuit.add_source(Generator::new(move || {
let res = n;
n -= 1;
res
}));
// Compute pairwise sums of values in the stream; the output stream will contain zeros.
source1.plus(&source2).inspect(|n| assert_eq!(*n, 0));
Ok(())
})
.unwrap()
.0;
Source§impl<T> Stream<RootCircuit, T>where
T: Clone + 'static,
impl<T> Stream<RootCircuit, T>where
T: Clone + 'static,
Sourcepub fn stream_fold<A, F>(&self, init: A, fold_func: F) -> Stream<RootCircuit, A>
pub fn stream_fold<A, F>(&self, init: A, fold_func: F) -> Stream<RootCircuit, A>
Folds every element in the input stream into an accumulator and outputs the current value of the accumulator at every clock cycle.
§Arguments
init- initial value of the accumulator.fold_func- closure that computes the new value of the accumulator as a function of the previous value and the new input at each clock cycle.
pub fn stream_fold_persistent<A, F>( &self, persistent_id: Option<&str>, init: A, fold_func: F, ) -> Stream<RootCircuit, A>
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn sum<'a, I>(&'a self, streams: I) -> Stream<C, D>where
I: IntoIterator<Item = &'a Self>,
pub fn sum<'a, I>(&'a self, streams: I) -> Stream<C, D>where
I: IntoIterator<Item = &'a Self>,
Apply the Sum operator to self and all streams in streams.
The first output is the sum of the first input from each input stream,
the second output is the sum of the second input from each input stream,
and so on.
Source§impl<C, D> Stream<C, D>where
C: Circuit,
impl<C, D> Stream<C, D>where
C: Circuit,
Sourcepub fn transaction_delay_with_initial_value(&self, initial: D) -> Stream<C, D>
pub fn transaction_delay_with_initial_value(&self, initial: D) -> Stream<C, D>
Applies TransactionZ1 operator to self.
Source§impl<C, D> Stream<C, D>where
C: Circuit,
impl<C, D> Stream<C, D>where
C: Circuit,
Source§impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
Sourcepub fn accumulate_trace(
&self,
) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
pub fn accumulate_trace( &self, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
Record batches in self in a trace.
This operator labels each untimed batch in the stream with the current timestamp and adds it to a trace.
It updates the output trace once per transaction, on flush.
Sourcepub fn accumulate_trace_with_bound<T>(
&self,
lower_key_bound: TraceBound<B::Key>,
lower_val_bound: TraceBound<B::Val>,
) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
pub fn accumulate_trace_with_bound<T>( &self, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
Record batches in self in a trace with bounds lower_key_bound and
lower_val_bound.
┌──────────┐ ┌────────────────────────┐ trace
self ─────►│accumulate├───►│ AccumulateTraceAppend │───────────────┬────► output
└──────────┘ └────────────────────────┘ │
▲ │
│ │
│ ┌─────────────────┐ │ z1feedback
└──────────────┤AccumulateZ1Trace├◄────┘
delayed_trace └─────────────────┘Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn accumulate_integrate_trace_retain_keys<TS, RK>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_key_func: RK,
)
pub fn accumulate_integrate_trace_retain_keys<TS, RK>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_key_func: RK, )
Applies a retainment policy to keys in the integral of self.
§Background
Relations that store time series data typically have the property that any new updates can only affect records with recent timestamps. Depending on how the relation is used in queries this might mean that, while records with older timestamps still exist in the relation, they cannot affect any future incremental computation and therefore don’t need to be stored.
§Design
We support two mechanism to specify and eventually discard such unused records.
The first mechanism, exposed via the
accumulate_integrate_trace_with_bound
method, is only applicable when keys and/or values in the collection
are ordered by time. It allows each consumer of the trace to specify
a lower bound on the keys and values it is interested in. The
effective bound is the minimum of all bounds specified by individual
consumers.
The second mechanism, implemented by this method and the
accumulate_integrate_trace_retain_values
method, is more general and allows the caller to specify an
arbitrary condition on keys and values in the trace respectively.
Keys or values that don’t satisfy the condition are eventually
reclaimed by the trace. This mechanism is applicable to collections
that are not ordered by time. Hence it doesn’t require rearranging
the data in time order. Furthermore, it is applicable to collections
that contain multiple timestamp column. Such multidimensional
timestamps only form a partial order.
Unlike the first mechanism, this mechanism only allows one global
condition to be applied to the stream. This bound affects all
operators that use the trace of the stream, i.e., call
integrate_trace (or trace in the root scope) on it. This includes
for instance join, aggregate, and distinct. All such operators
will reference the same instance of a trace. Therefore bounds
specified by this API must be based on a global analysis of the
entire program.
The two mechanisms described above interact in different ways for keys and values. For keys, the lower bound and the retainment condition are independent and can be active at the same time. Internally, they are enforced using different techniques. Lower bounds are enforced at essentially zero cost. The retention condition is more expensive, but more general.
For values, only one of the two mechanisms can be enabled for any given stream. Whenever a retainment condition is specified it supersedes any lower bounds constraints.
§Arguments
-
bounds_stream- This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps inselfor some other stream, computed with the help of thewaterlineoperator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams. -
retain_key_func- given the value received from thebounds_streamat the last clock cycle and a key, returnstrueif the key should be retained in the trace andfalseif it should be discarded.
§Correctness
-
As discussed above, the retainment policy set using this method applies to all consumers of the trace. An incorrect policy may reclaim keys that are still needed by some of the operators, leading to incorrect results. Computing a correct retainment policy can be a subtle and error prone task, which is probably best left to automatic tools like compilers.
-
The retainment policy set using this method only applies to
self, but not any stream derived from it. In particular, ifselfis re-sharded using theshardoperator, then it may be necessary to callintegrate_trace_retain_keyson the resulting stream. In general, computing a correct retainment policy requires keep track of- Streams that are sharded by construction and hence the
shardoperator is a no-op for such streams. For instance, theadd_input_setandaggregateoperators produce sharded streams. - Operators that
shardtheir input streams, e.g.,join.
- Streams that are sharded by construction and hence the
-
This method should be invoked at most once for a stream.
-
retain_key_funcmust be monotone in its first argument: for any timestampts1and keyksuch thatretain_key_func(ts1, k) = false, and for anyts2 >= ts1it must hold thatretain_key_func(ts2, k) = false, i.e., once a key is rejected, it will remain rejected as the bound increases.
Sourcepub fn accumulate_integrate_trace_retain_values<TS, RV>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_value_func: RV,
)
pub fn accumulate_integrate_trace_retain_values<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, )
Similar to
accumulate_integrate_trace_retain_keys,
but applies a retainment policy to values in the trace.
Sourcepub fn accumulate_integrate_trace_retain_values_last_n<TS, RV>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_value_func: RV,
n: usize,
)
pub fn accumulate_integrate_trace_retain_values_last_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
Applies a retainment policy that keeps all values above the threshold
in bounds_stream and up to n latest values before the threshold.
Notifies the garbage collector that it should preserve all values that
satisfy the predicate and the last n values before the first value that
satisfies the predicate for each key. If no value associated with a key
satisfies the predicate, the last n values are preserved.
Used to garbage collect streams that need to preserve a fixed number of values below a waterline, regardless of how far in the past they are. Examples include the right-hand side of an asof join and inputs to top-k operators.
IMPORTANT: this method assumes that for each key in self, values are
sorted in such a way that once the retain_value_func predicate is
satisfied for a value, it is also satisfied for all subsequent values.
§Arguments
-
bounds_stream- This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps inselfor some other stream, computed with the help of thewaterlineoperator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams. -
retain_value_func- given the value received from thebounds_streamat the last clock cycle and a value, returnstrueif the value should be retained in the trace andfalseif it should be discarded. -
n- the number of values to preserve.
Sourcepub fn accumulate_integrate_trace_retain_values_top_n<TS, RV>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_value_func: RV,
n: usize,
)
pub fn accumulate_integrate_trace_retain_values_top_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
Applies a retainment policy that keeps all values above the threshold
in bounds_stream and up to n largest values below the threshold.
This is similar to accumulate_integrate_trace_retain_values_last_n, but
it does not assume that values in the group are sorted according to a timestamp.
Can be used to GC the MAX aggregate or top-k group transformers.
§Arguments
-
bounds_stream- This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps inselfor some other stream, computed with the help of thewaterlineoperator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams. -
retain_value_func- given the value received from thebounds_streamat the last clock cycle and a value, returnstrueif the value should be retained in the trace andfalseif it should be discarded. -
n- the number of values to preserve.
Sourcepub fn accumulate_integrate_trace_retain_values_bottom_n<TS, RV>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_value_func: RV,
n: usize,
)
pub fn accumulate_integrate_trace_retain_values_bottom_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
Similar to accumulate_integrate_trace_retain_values_top_n, but keeps the bottom n values.
Sourcepub fn accumulate_integrate_trace(&self) -> Stream<C, Spine<B>>
pub fn accumulate_integrate_trace(&self) -> Stream<C, Spine<B>>
Constructs and returns a untimed trace of this stream.
The trace is unbounded, meaning that data will not be discarded because
it has a low key or value. Filter functions set with
accumulate_integrate_trace_retain_keys or
accumulate_integrate_trace_retain_values
can still discard data.
The result batch is stored durably for fault tolerance.
Sourcepub fn accumulate_integrate_trace_with_bound(
&self,
lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>,
lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>,
) -> Stream<C, Spine<B>>
pub fn accumulate_integrate_trace_with_bound( &self, lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>, lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>, ) -> Stream<C, Spine<B>>
Constructs and returns a untimed trace of this stream.
Data in the trace with a key less than lower_key_bound or value less
than lower_val_bound can be discarded, although these bounds can be
lowered later (discarding less data). Filter functions set with
accumulate_integrate_trace_retain_keys or
accumulate_integrate_trace_retain_values
can still discard data.
The result batch is stored durably for fault tolerance.
Updates the output trace once per transaction, on flush.
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Sourcepub fn stream_aggregate<A>(
&self,
aggregator: A,
) -> Stream<C, OrdIndexedZSet<Z::Key, A::Output>>
pub fn stream_aggregate<A>( &self, aggregator: A, ) -> Stream<C, OrdIndexedZSet<Z::Key, A::Output>>
Aggregate values associated with each key in an indexed Z-set.
An indexed Z-set IndexedZSet<K, V, R> maps each key into a
set of (value, weight) tuples (V, R). These tuples form
a nested Z-set ZSet<V, R>. This method applies aggregator
to each such Z-set and adds it to the output indexed Z-set with
weight +1.
Sourcepub fn stream_aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>where
Z: Batch<Time = ()>,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, (), Z::R>,
A::Output: Erase<O::DynV>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A::Output>,
pub fn stream_aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>where
Z: Batch<Time = ()>,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, (), Z::R>,
A::Output: Erase<O::DynV>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A::Output>,
Like Self::stream_aggregate, but can return any batch type.
Sourcepub fn stream_aggregate_linear<F, A>(
&self,
f: F,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
pub fn stream_aggregate_linear<F, A>( &self, f: F, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
A version of Self::dyn_stream_aggregate optimized for linear
aggregation functions.
This method only works for linear aggregation functions f, i.e.,
functions that satisfy f(a+b) = f(a) + f(b), where the first “+”
is the zset union of zsets composed of tuples a and b.
This function will will produce incorrect results if f is not linear.
The input stream is ZSet of (key, value) pairs, but the function
only receives the “value” part as an input.
Sourcepub fn stream_aggregate_linear_generic<F, A, O>(&self, f: F) -> Stream<C, O>
pub fn stream_aggregate_linear_generic<F, A, O>(&self, f: F) -> Stream<C, O>
Like Self::stream_aggregate_linear, but can return any batch
type.
Sourcepub fn aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>where
Z: Batch<Time = ()> + Debug,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, <C as WithClock>::Time, Z::R>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = DynData>,
A::Output: Erase<O::DynV>,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
pub fn aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>where
Z: Batch<Time = ()> + Debug,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, <C as WithClock>::Time, Z::R>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = DynData>,
A::Output: Erase<O::DynV>,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
Like Self::dyn_aggregate, but can return any batch type.
Sourcepub fn aggregate_linear<F, A>(
&self,
f: F,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>where
Z: IndexedZSet<DynK = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
F: Fn(&Z::Val) -> A + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
pub fn aggregate_linear<F, A>(
&self,
f: F,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>where
Z: IndexedZSet<DynK = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
F: Fn(&Z::Val) -> A + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
A version of Self::aggregate optimized for linear
aggregation functions.
This method only works for linear aggregation functions f, i.e.,
functions that satisfy f(a+b) = f(a) + f(b), where the first “+”
is zset union of the zsets composed of tuples a and b.
This function will produce
incorrect results if f is not linear. The input of
aggregate_linear is an indexed Zset, but the function f is only
applied to the values, ignoring the keys.
Sourcepub fn weigh<F, T>(&self, f: F) -> Stream<C, OrdWSet<Z::Key, T, DynWeight>>
pub fn weigh<F, T>(&self, f: F) -> Stream<C, OrdWSet<Z::Key, T, DynWeight>>
Convert indexed Z-set Z into a Z-set where the weight of each key
is computed as:
__
╲
╱ f(k,v) * w
‾‾
(k,v,w) ∈ ZDiscards the values from the input.
This is a linear operator.
Sourcepub fn weigh_generic<F, T, O>(&self, f: F) -> Stream<C, O>
pub fn weigh_generic<F, T, O>(&self, f: F) -> Stream<C, O>
Like Self::weigh, but can return any batch type.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn aggregate_linear_postprocess_retain_keys<F, A, OF, OV, TS, RK>(
&self,
waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>,
retain_key_func: RK,
f: F,
of: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn aggregate_linear_postprocess_retain_keys<F, A, OF, OV, TS, RK>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Like aggregate_linear_postprocess, but additionally applies waterline to the internal integral
See aggregate_linear_retain_keys for details.
pub fn aggregate_linear_postprocess_retain_keys_persistent<F, A, OF, OV, TS, RK>( &self, persistent_id: Option<&str>, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Sourcepub fn aggregate_linear_retain_keys<F, A, TS, RK>(
&self,
waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>,
retain_key_func: RK,
f: F,
) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
pub fn aggregate_linear_retain_keys<F, A, TS, RK>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
Like aggregate_linear, but additionally applies waterline to the internal integral
The linear aggregate operator internally invokes the regular aggregation operator, which creates two integrals: for input and output streams. The latter stream is returned as output of this operator and can be GC’d using regular means (integrate_trace_retain_keys), but the former integral is internal to this operator. When aggregating a stream that has a waterline, use this function to GC keys in the internal stream that fall below the waterline.
Source§impl<K1, V1> Stream<RootCircuit, OrdIndexedZSet<K1, V1>>
impl<K1, V1> Stream<RootCircuit, OrdIndexedZSet<K1, V1>>
Sourcepub fn asof_join<TS, F, TSF1, TSF2, V2, V>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K1, V2>>,
join: F,
ts_func1: TSF1,
ts_func2: TSF2,
) -> Stream<RootCircuit, OrdZSet<V>>
pub fn asof_join<TS, F, TSF1, TSF2, V2, V>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K1, V2>>, join: F, ts_func1: TSF1, ts_func2: TSF2, ) -> Stream<RootCircuit, OrdZSet<V>>
Asof-join operator.
An asof-join operator combines records from two tables based on a common key
(similar to an equi-join), as well as a timestamp. It assumes that both tables
contain a timestamp column (ts). It matches each value v in self with
the value in other that has the same key and the largest timestamp not
exceeding v.ts. If there are multiple values with the same timestamp, the
operator picks the largest one based on the ordering (according to Ord) on
type V2. If there is no value v2, such that v2.ts <= v.ts in other,
then the value None is used, i.e., this operator behaves as a left join.
The operator assumes that values in both collections are sorted by timestamp,
i.e., impl Ord for V1 must satisfy ts_func1(v) < ts_func1(u) ==> v < u.
Similarly for V2: ts_func2(v) < ts_func2(u) ==> v < u.
§Arguments
self- the left-hand side of the join.other- the right-hand side of the join.join- join function that maps a key, a value fromself, and an optional value fromotherto an output value.ts_func1- extracts the value of the timestamp column from a record inself.ts_func2- extracts the value of the timestamp column from a record inother.
Source§impl<C, Z> Stream<C, Z>
impl<C, Z> Stream<C, Z>
Sourcepub fn average<A, F>(&self, f: F) -> Stream<C, OrdIndexedZSet<Z::Key, A>>where
A: DBData + From<ZWeight> + MulByRef<ZWeight, Output = A> + Div<Output = A> + GroupValue,
F: Fn(&Z::Val) -> A + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
pub fn average<A, F>(&self, f: F) -> Stream<C, OrdIndexedZSet<Z::Key, A>>where
A: DBData + From<ZWeight> + MulByRef<ZWeight, Output = A> + Div<Output = A> + GroupValue,
F: Fn(&Z::Val) -> A + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
Incremental average aggregate.
This operator is a specialization of Stream::aggregate that for
each key k in the input indexed Z-set computes the average value as:
__ __
╲ ╲
╱ v * w / ╱ w
‾‾ ‾‾
(v,w) ∈ Z[k] (v,w) ∈ Z[k]§Design
Average is a quasi-linear aggregate, meaning that it can be efficiently
computed as a composition of two linear aggregates: sum and count.
The (sum, count) pair with pair-wise operations is also a linear
aggregate and can be computed with a single
Stream::aggregate_linear operator. The actual average is
computed by applying the (sum, count) -> sum / count
transformation to its output.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn chain_aggregate<A, FInit, FUpdate>(
&self,
finit: FInit,
fupdate: FUpdate,
) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
pub fn chain_aggregate<A, FInit, FUpdate>( &self, finit: FInit, fupdate: FUpdate, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
Aggregate whose value depends on the previous value of the aggregate and changes to the input collection.
Unlike general aggregates, such aggregates don’t require storing the integral of the input collection and hence require only O(1) memory per key.
Examples include min and max over append-only collections.
§Arguments
finit- returns the initial value of the aggregate.fupdate- updates the aggregate given its previous value and a new element of the group.
pub fn chain_aggregate_persistent<A, FInit, FUpdate>( &self, persistent_id: Option<&str>, finit: FInit, fupdate: FUpdate, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
Source§impl<C, T> Stream<C, T>
impl<C, T> Stream<C, T>
Sourcepub fn consolidate(
&self,
) -> Stream<C, TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>>
pub fn consolidate( &self, ) -> Stream<C, TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>>
Consolidate a trace into a single batch.
Each element in the input streams is a trace, consisting of multiple batches of updates. This operator consolidates the trace into a single batch, which uses less memory and can be handled more efficiently by most operators than the trace.
This operator is typically attached to the output of a nested circuit computed as the sum of deltas across all iterations of the circuit. Once the iteration has converged (e.g., reaching a fixed point) is a good time to consolidate the output.
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
Sourcepub fn controlled_key_filter<T, E, F, RF>(
&self,
threshold: &Stream<C, TypedBox<T, DynData>>,
filter_func: F,
report_func: RF,
) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
pub fn controlled_key_filter<T, E, F, RF>( &self, threshold: &Stream<C, TypedBox<T, DynData>>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
A controlled filter operator that discards input keys by comparing them to a scalar
value in the threshold stream.
This operator compares each key in the input indexed Z-set against the current value in
the threshold stream using filter_func and discards keys that don’t pass the filter.
Returns a pair of output streams.
Filtered output stream: (key, value, weight) tuples whose key passes the filter are sent to the first output stream unmodified.
Error stream: Keys that don’t pass the check are transformed by report_func and sent
to the second output stream. More precisely report_func is invoked for each (key, value, weight)
tuple whose key does not pass the filter and returns a value of type E, which is sent
with weight 1 to the error stream.
Sourcepub fn controlled_value_filter<T, E, F, RF>(
&self,
threshold: &Stream<C, TypedBox<T, DynData>>,
filter_func: F,
report_func: RF,
) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
pub fn controlled_value_filter<T, E, F, RF>( &self, threshold: &Stream<C, TypedBox<T, DynData>>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
A controlled filter operator that discards input key/values pairs by comparing them to a scalar
value in the threshold stream.
This operator compares each key/value pair in the input indexed Z-set against the current value in
the threshold stream using filter_func and discards tuples that don’t pass the filter.
Returns a pair of output streams.
Filtered output stream: (key, value, weight) tuples that pass the check are sent to the first output stream unmodified.
Error stream: Tuples that don’t pass the check are transformed by report_func and sent
to the second output stream. More precisely report_func is invoked for each (key, value, weight)
tuple whose key and value do not pass the filter and returns a value of type E, which is sent
with weight 1 to the error stream.
Sourcepub fn controlled_value_filter_typed<T, E, F, RF>(
&self,
threshold: &Stream<C, T>,
filter_func: F,
report_func: RF,
) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
pub fn controlled_value_filter_typed<T, E, F, RF>( &self, threshold: &Stream<C, T>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
Like Self::controlled_value_filter, but values in the threshold stream are strongly typed
instead of having type TypedBox.
Source§impl<C, Z> Stream<C, Z>
impl<C, Z> Stream<C, Z>
Sourcepub fn stream_distinct(&self) -> Stream<C, Z>
pub fn stream_distinct(&self) -> Stream<C, Z>
Reduces input batches to one occurrence of each element.
For each input batch B, the operator produces an output batch
that contains at most one occurrence of each tuple in B.
Specifically, for each input tuple (key, value, weight) with
weight > 0 the operator produces an output tuple (key, value, 1).
Tuples with weight <= 0 are dropped.
Intuitively, the operator converts the input multiset into a set by eliminating duplicates.
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn dyn_accumulate_trace(
&self,
output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories,
batch_factories: &B::Factories,
) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
pub fn dyn_accumulate_trace( &self, output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
Sourcepub fn dyn_accumulate_trace_with_bound(
&self,
output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories,
batch_factories: &B::Factories,
lower_key_bound: TraceBound<B::Key>,
lower_val_bound: TraceBound<B::Val>,
) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
pub fn dyn_accumulate_trace_with_bound( &self, output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories, batch_factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
Sourcepub fn dyn_accumulate_integrate_trace_retain_keys<TS>(
&self,
bounds_stream: &Stream<C, Box<TS>>,
retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>,
)
pub fn dyn_accumulate_integrate_trace_retain_keys<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>, )
Sourcepub fn dyn_accumulate_integrate_trace_retain_values<TS>(
&self,
bounds_stream: &Stream<C, Box<TS>>,
retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
)
pub fn dyn_accumulate_integrate_trace_retain_values<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, )
Sourcepub fn dyn_accumulate_integrate_trace_retain_values_last_n<TS>(
&self,
bounds_stream: &Stream<C, Box<TS>>,
retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
n: usize,
)
pub fn dyn_accumulate_integrate_trace_retain_values_last_n<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
Sourcepub fn dyn_accumulate_integrate_trace_retain_values_top_n<TS>(
&self,
val_factory: &'static dyn Factory<B::Val>,
bounds_stream: &Stream<C, Box<TS>>,
retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
n: usize,
)
pub fn dyn_accumulate_integrate_trace_retain_values_top_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
Sourcepub fn dyn_accumulate_integrate_trace_retain_values_bottom_n<TS>(
&self,
val_factory: &'static dyn Factory<B::Val>,
bounds_stream: &Stream<C, Box<TS>>,
retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
n: usize,
)
pub fn dyn_accumulate_integrate_trace_retain_values_bottom_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
pub fn dyn_accumulate_integrate_trace( &self, factories: &B::Factories, ) -> Stream<C, Spine<B>>
pub fn dyn_accumulate_integrate_trace_with_bound( &self, factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<B>>
Source§impl<C, B> Stream<C, Spine<B>>
impl<C, B> Stream<C, Spine<B>>
Sourcepub fn accumulate_delay_trace(&self) -> Stream<C, SpineSnapshot<B>>
pub fn accumulate_delay_trace(&self) -> Stream<C, SpineSnapshot<B>>
Returns the trace of self delayed by one clock cycle.
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn dyn_accumulate(
&self,
factories: &B::Factories,
) -> Stream<C, Option<Spine<B>>>
pub fn dyn_accumulate( &self, factories: &B::Factories, ) -> Stream<C, Option<Spine<B>>>
See Stream::accumulate.
Sourcepub fn dyn_accumulate_with_enable_count(
&self,
factories: &B::Factories,
) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>)
pub fn dyn_accumulate_with_enable_count( &self, factories: &B::Factories, ) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>)
Source§impl<C, Z> Stream<C, Z>
impl<C, Z> Stream<C, Z>
Sourcepub fn dyn_average<A, W>(
&self,
persistent_id: Option<&str>,
factories: &AvgFactories<Z, A, W, C::Time>,
f: Box<dyn Fn(&Z::Key, &Z::Val, &DynZWeight, &mut W)>,
out_func: Box<dyn AggOutputFunc<W, A>>,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
pub fn dyn_average<A, W>( &self, persistent_id: Option<&str>, factories: &AvgFactories<Z, A, W, C::Time>, f: Box<dyn Fn(&Z::Key, &Z::Val, &DynZWeight, &mut W)>, out_func: Box<dyn AggOutputFunc<W, A>>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
See Stream::average.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_chain_aggregate_mono( &self, persistent_id: Option<&str>, input_factories: &<MonoIndexedZSet as BatchReader>::Factories, output_factories: &<MonoIndexedZSet as BatchReader>::Factories, finit: Box<dyn Fn(&mut DynData, &DynData, ZWeight)>, fupdate: Box<dyn Fn(&mut DynData, &DynData, ZWeight)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<Z> Stream<RootCircuit, Z>where
Z: IndexedZSet,
impl<Z> Stream<RootCircuit, Z>where
Z: IndexedZSet,
Sourcepub fn dyn_chain_aggregate<OZ>(
&self,
persistent_id: Option<&str>,
input_factories: &Z::Factories,
output_factories: &OZ::Factories,
finit: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>,
fupdate: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>,
) -> Stream<RootCircuit, OZ>where
OZ: IndexedZSet<Key = Z::Key>,
pub fn dyn_chain_aggregate<OZ>(
&self,
persistent_id: Option<&str>,
input_factories: &Z::Factories,
output_factories: &OZ::Factories,
finit: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>,
fupdate: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>,
) -> Stream<RootCircuit, OZ>where
OZ: IndexedZSet<Key = Z::Key>,
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_aggregate_mono( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<MonoIndexedZSet, MonoIndexedZSet, ()>, aggregator: &dyn DynAggregator<DynData, (), DynZWeight, Accumulator = DynData, Output = DynData>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_aggregate_linear_mono( &self, persisent_id: Option<&str>, factories: &IncAggregateLinearFactories<MonoIndexedZSet, DynWeight, MonoIndexedZSet, ()>, agg_func: Box<dyn Fn(&DynData, &DynData, &DynZWeight, &mut DynWeight)>, out_func: Box<dyn WeightedCountOutFunc<DynWeight, DynData>>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl Stream<NestedCircuit, MonoIndexedZSet>
impl Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_aggregate_mono( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<MonoIndexedZSet, MonoIndexedZSet, <NestedCircuit as WithClock>::Time>, aggregator: &dyn DynAggregator<DynData, <NestedCircuit as WithClock>::Time, DynZWeight, Accumulator = DynData, Output = DynData>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_aggregate_linear_mono( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<MonoIndexedZSet, DynWeight, MonoIndexedZSet, <NestedCircuit as WithClock>::Time>, agg_func: Box<dyn Fn(&DynData, &DynData, &DynZWeight, &mut DynWeight)>, out_func: Box<dyn WeightedCountOutFunc<DynWeight, DynData>>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
Source§impl<C, Z> Stream<C, Z>
impl<C, Z> Stream<C, Z>
Sourcepub fn dyn_stream_aggregate<Acc, Out>(
&self,
factories: &StreamAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>>,
aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
pub fn dyn_stream_aggregate<Acc, Out>( &self, factories: &StreamAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
Sourcepub fn dyn_stream_aggregate_generic<Acc, Out, O>(
&self,
factories: &StreamAggregateFactories<Z, O>,
aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>,
) -> Stream<C, O>
pub fn dyn_stream_aggregate_generic<Acc, Out, O>( &self, factories: &StreamAggregateFactories<Z, O>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
Like Self::dyn_stream_aggregate, but can return any batch type.
Sourcepub fn dyn_stream_aggregate_linear<A>(
&self,
factories: &StreamLinearAggregateFactories<Z, A, OrdIndexedZSet<Z::Key, A>>,
f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
pub fn dyn_stream_aggregate_linear<A>( &self, factories: &StreamLinearAggregateFactories<Z, A, OrdIndexedZSet<Z::Key, A>>, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
Sourcepub fn dyn_stream_aggregate_linear_generic<A, O>(
&self,
factories: &StreamLinearAggregateFactories<Z, A, O>,
agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>,
out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>,
) -> Stream<C, O>
pub fn dyn_stream_aggregate_linear_generic<A, O>( &self, factories: &StreamLinearAggregateFactories<Z, A, O>, agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>, ) -> Stream<C, O>
Like Self::dyn_stream_aggregate_linear, but can return any batch
type.
Sourcepub fn dyn_aggregate<Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>, C::Time>,
aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
pub fn dyn_aggregate<Acc, Out>( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>, C::Time>, aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
See Stream::aggregate.
Sourcepub fn dyn_aggregate_generic<Acc, Out, O>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateFactories<Z, O, C::Time>,
aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>,
) -> Stream<C, O>
pub fn dyn_aggregate_generic<Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<Z, O, C::Time>, aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
Like Self::dyn_aggregate, but can return any batch type.
Sourcepub fn dyn_aggregate_linear<A>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateLinearFactories<Z, A, OrdIndexedZSet<Z::Key, A>, C::Time>,
f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>,
) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
pub fn dyn_aggregate_linear<A>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, A, OrdIndexedZSet<Z::Key, A>, C::Time>, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
Sourcepub fn dyn_aggregate_linear_generic<A, O>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateLinearFactories<Z, A, O, C::Time>,
agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>,
out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>,
) -> Stream<C, O>
pub fn dyn_aggregate_linear_generic<A, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, A, O, C::Time>, agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>, ) -> Stream<C, O>
Like Self::dyn_aggregate_linear, but can return any batch type.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_aggregate_linear_retain_keys_mono( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<MonoIndexedZSet, DynWeight, MonoIndexedZSet, ()>, waterline: &Stream<RootCircuit, Box<DynData>>, retain_key_func: Box<dyn Fn(&DynData) -> Filter<DynData>>, agg_func: Box<dyn Fn(&DynData, &DynData, &DynZWeight, &mut DynWeight)>, out_func: Box<dyn WeightedCountOutFunc<DynWeight, DynData>>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<Z> Stream<RootCircuit, Z>where
Z: Clone + 'static,
impl<Z> Stream<RootCircuit, Z>where
Z: Clone + 'static,
pub fn dyn_aggregate_linear_retain_keys_generic<A, O, TS>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateLinearFactories<Z, A, O, ()>,
waterline: &Stream<RootCircuit, Box<TS>>,
retain_key_func: Box<dyn Fn(&TS) -> Filter<Z::Key>>,
agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>,
out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>,
) -> Stream<RootCircuit, O>where
Z: IndexedZSet<Time = ()>,
O: IndexedZSet<Key = Z::Key>,
A: WeightTrait + ?Sized,
TS: DataTrait + ?Sized,
Box<TS>: Clone,
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_asof_join_mono( &self, factories: &AsofJoinFactories<DynData, MonoIndexedZSet, MonoIndexedZSet, MonoZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, ts_func1: Box<dyn Fn(&DynData, &mut DynData)>, tscmp_func: Box<dyn Fn(&DynData, &DynData) -> Ordering>, valts_cmp_func: Box<dyn Fn(&DynData, &DynData) -> Ordering>, join_func: Box<AsofJoinFunc<DynData, DynData, DynData, DynData, DynUnit>>, ) -> Stream<RootCircuit, MonoZSet>
Source§impl<I1> Stream<RootCircuit, I1>where
I1: IndexedZSet + Send,
impl<I1> Stream<RootCircuit, I1>where
I1: IndexedZSet + Send,
Sourcepub fn dyn_asof_join<TS, I2, V>(
&self,
factories: &AsofJoinFactories<TS, I1, I2, OrdZSet<V>>,
other: &Stream<RootCircuit, I2>,
ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>,
tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>,
valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>,
join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>>,
) -> Stream<RootCircuit, OrdZSet<V>>
pub fn dyn_asof_join<TS, I2, V>( &self, factories: &AsofJoinFactories<TS, I1, I2, OrdZSet<V>>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>>, ) -> Stream<RootCircuit, OrdZSet<V>>
See Stream::asof_join.
pub fn dyn_asof_join_index<TS, I2, K, V>( &self, factories: &AsofJoinFactories<TS, I1, I2, OrdIndexedZSet<K, V>>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, K, V>>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn dyn_asof_join_generic<TS, I2, Z>(
&self,
factories: &AsofJoinFactories<TS, I1, I2, Z>,
other: &Stream<RootCircuit, I2>,
ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>,
tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>,
valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>,
join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>>,
) -> Stream<RootCircuit, Z>
pub fn dyn_asof_join_generic<TS, I2, Z>( &self, factories: &AsofJoinFactories<TS, I1, I2, Z>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>>, ) -> Stream<RootCircuit, Z>
Like Self::dyn_asof_join_index, but can return any indexed Z-set type.
Source§impl<C, B> Stream<C, B>where
C: Circuit,
B: IndexedZSet,
impl<C, B> Stream<C, B>where
C: Circuit,
B: IndexedZSet,
Sourcepub fn dyn_accumulate_trace_balanced(
&self,
trace_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
batch_factories: &B::Factories,
) -> (Stream<C, Option<Spine<B>>>, Stream<C, TimedSpine<B, C>>)
pub fn dyn_accumulate_trace_balanced( &self, trace_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> (Stream<C, Option<Spine<B>>>, Stream<C, TimedSpine<B, C>>)
Shard and integrate self using dynamic sharding policy selected by the balancer.
Combines exchange, accumulator, and integral into a single operator. In steady state, applies the selected sharding policy to the input stream. When the sharding policy changes, retracts the previous content of the integral and re-distributes it using the new policy.
§Returns
The output of the accumulator and the integral.
§Circuit
│
│self
▼
┌─────────────────────────┐
┌─────────►│RebalancingExchangeSender│◄───┐
│ └────────────┬────────────┘ │
│ . │delayed_acc
│ . |
│ ▼ │
│ ┌────────────────┐ ┌──┐
│ │ExchangeReceiver│ │Z1│
│ └────────────────┘ └──┘
│ │ ▲
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │RebalancingAccumulator│────┘
delayed_trace│ └──────────────────────┘
│ │
│ │────────────────────┐
├──────────────────┐ │ │
│ ▼ ▼ │
┌───────┴─────────┐ ┌─────────────────────┐ │
│AccumulateZ1Trace│◄───┤AccumulateTraceAppend│ │
└─────────────────┘ └────────┬────────────┘ │
│ │
│trace │accumulator_stream
│ │
▼ ▼The two feedback streams (delayed_acc and delayed_trace) are used by RebalancingExchangeSender to retract previously accumulated content of the integral when the sharding policy changes.
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
pub fn dyn_accumulate_with_feedback_stream( &self, factories: &B::Factories, ) -> (Stream<C, Option<Spine<B>>>, Rc<RefCell<RebalancingAccumulatorInner<B>>>, RefStreamValue<EmptyCheckpoint<Vec<Arc<B>>>>)
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn dyn_gather(
&self,
factories: &B::Factories,
receiver_worker: usize,
) -> Stream<C, B>
pub fn dyn_gather( &self, factories: &B::Factories, receiver_worker: usize, ) -> Stream<C, B>
See Stream::gather.
Source§impl<C, IB> Stream<C, IB>
impl<C, IB> Stream<C, IB>
Sourcepub fn dyn_shard(&self, factories: &IB::Factories) -> Stream<C, IB>
pub fn dyn_shard(&self, factories: &IB::Factories) -> Stream<C, IB>
See Stream::shard.
Sourcepub fn dyn_shard_workers(
&self,
workers: Range<usize>,
factories: &IB::Factories,
) -> Stream<C, IB>
pub fn dyn_shard_workers( &self, workers: Range<usize>, factories: &IB::Factories, ) -> Stream<C, IB>
Sourcepub fn dyn_shard_generic<OB>(
&self,
factories: &OB::Factories,
) -> Option<Stream<C, OB>>
pub fn dyn_shard_generic<OB>( &self, factories: &OB::Factories, ) -> Option<Stream<C, OB>>
Like Self::dyn_shard, but can assemble the results into any output batch
type OB.
Returns None when the circuit is not running inside a multithreaded
runtime or is running in a runtime with a single worker thread.
Sourcepub fn dyn_shard_generic_workers<OB>(
&self,
workers: Range<usize>,
factories: &OB::Factories,
) -> Option<Stream<C, OB>>
pub fn dyn_shard_generic_workers<OB>( &self, workers: Range<usize>, factories: &OB::Factories, ) -> Option<Stream<C, OB>>
Like Self::dyn_shard, but can assemble the results into any output batch
type OB.
Returns None when the circuit is not running inside a multithreaded
runtime or is running in a runtime with a single worker thread.
Source§impl<C, T> Stream<C, T>where
C: Circuit,
T: 'static,
impl<C, T> Stream<C, T>where
C: Circuit,
T: 'static,
Sourcepub fn mark_sharded(&self) -> Self
pub fn mark_sharded(&self) -> Self
Marks the data within the current stream as sharded, meaning that all
further calls to .shard() will have no effect.
This must only be used on streams of values that are properly sharded across workers, otherwise this will cause the dataflow to yield incorrect results
Sourcepub fn has_sharded_version(&self) -> bool
pub fn has_sharded_version(&self) -> bool
Returns true if a sharded version of the current stream exists
Sourcepub fn try_sharded_version(&self) -> Self
pub fn try_sharded_version(&self) -> Self
Returns the sharded version of the stream if it exists
(which may be the stream itself or the result of applying
the shard operator to it). Otherwise, returns self.
Sourcepub fn try_unsharded_version(&self) -> Self
pub fn try_unsharded_version(&self) -> Self
Returns the unsharded version of the stream if it exists, and otherwise
self.
Sourcepub fn is_sharded(&self) -> bool
pub fn is_sharded(&self) -> bool
Returns true if this stream is sharded.
Sourcepub fn mark_sharded_if<C2, U>(&self, input: &Stream<C2, U>)where
C2: Circuit,
U: 'static,
pub fn mark_sharded_if<C2, U>(&self, input: &Stream<C2, U>)where
C2: Circuit,
U: 'static,
Marks self as sharded if input has a sharded version of itself
Source§impl<C, T> Stream<C, T>
impl<C, T> Stream<C, T>
Sourcepub fn dyn_consolidate(
&self,
factories: &<T::Batch as BatchReader>::Factories,
) -> Stream<C, T::Batch>
pub fn dyn_consolidate( &self, factories: &<T::Batch as BatchReader>::Factories, ) -> Stream<C, T::Batch>
See Stream::consolidate.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_controlled_key_filter_mono( &self, factories: ControlledFilterFactories<MonoIndexedZSet, DynData>, threshold: &Stream<RootCircuit, Box<DynData>>, filter_func: Box<dyn Fn(&DynData, &DynData) -> bool>, report_func: Box<dyn Fn(&DynData, &DynData, &DynData, ZWeight, &mut DynData)>, ) -> (Stream<RootCircuit, MonoIndexedZSet>, Stream<RootCircuit, MonoZSet>)
Source§impl Stream<RootCircuit, MonoZSet>
impl Stream<RootCircuit, MonoZSet>
pub fn dyn_controlled_key_filter_mono( &self, factories: ControlledFilterFactories<MonoZSet, DynData>, threshold: &Stream<RootCircuit, Box<DynData>>, filter_func: Box<dyn Fn(&DynData, &DynData) -> bool>, report_func: Box<dyn Fn(&DynData, &DynData, &DynUnit, ZWeight, &mut DynData)>, ) -> (Stream<RootCircuit, MonoZSet>, Stream<RootCircuit, MonoZSet>)
Source§impl<C, Z> Stream<C, Z>
impl<C, Z> Stream<C, Z>
pub fn dyn_controlled_key_filter<T, E>( &self, factories: ControlledFilterFactories<Z, E>, threshold: &Stream<C, Box<T>>, filter_func: Box<dyn Fn(&T, &Z::Key) -> bool>, report_func: Box<dyn Fn(&T, &Z::Key, &Z::Val, ZWeight, &mut E)>, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
pub fn dyn_controlled_value_filter<T, E>( &self, factories: ControlledFilterFactories<Z, E>, threshold: &Stream<C, Box<T>>, filter_func: Box<dyn Fn(&T, &Z::Key, &Z::Val) -> bool>, report_func: Box<dyn Fn(&T, &Z::Key, &Z::Val, ZWeight, &mut E)>, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
Sourcepub fn dyn_weighted_count(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateLinearFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>, C::Time>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
pub fn dyn_weighted_count( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>, C::Time>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
Sourcepub fn dyn_weighted_count_generic<A, O>(
&self,
persistent_id: Option<&str>,
factories: &IncAggregateLinearFactories<Z, Z::R, O, C::Time>,
out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>,
) -> Stream<C, O>
pub fn dyn_weighted_count_generic<A, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, Z::R, O, C::Time>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
Like Self::dyn_weighted_count, but can return any batch type.
Sourcepub fn dyn_distinct_count(
&self,
persistent_id: Option<&str>,
factories: &DistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>, C::Time>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>where
Z: Send,
pub fn dyn_distinct_count(
&self,
persistent_id: Option<&str>,
factories: &DistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>, C::Time>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>where
Z: Send,
Sourcepub fn dyn_distinct_count_generic<A, O>(
&self,
persistent_id: Option<&str>,
factories: &DistinctCountFactories<Z, O, C::Time>,
out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>,
) -> Stream<C, O>
pub fn dyn_distinct_count_generic<A, O>( &self, persistent_id: Option<&str>, factories: &DistinctCountFactories<Z, O, C::Time>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
Like Self::dyn_distinct_count, but can return any batch type.
Sourcepub fn dyn_stream_weighted_count(
&self,
factories: &StreamLinearAggregateFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
pub fn dyn_stream_weighted_count( &self, factories: &StreamLinearAggregateFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
Sourcepub fn dyn_stream_weighted_count_generic<A, O>(
&self,
factories: &StreamLinearAggregateFactories<Z, Z::R, O>,
out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>,
) -> Stream<C, O>
pub fn dyn_stream_weighted_count_generic<A, O>( &self, factories: &StreamLinearAggregateFactories<Z, Z::R, O>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
Like Self::dyn_stream_weighted_count, but can return any batch type.
Sourcepub fn dyn_stream_distinct_count(
&self,
factories: &StreamDistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>where
Z: Send,
pub fn dyn_stream_distinct_count(
&self,
factories: &StreamDistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>>,
) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>where
Z: Send,
Sourcepub fn dyn_stream_distinct_count_generic<A, O>(
&self,
factories: &StreamDistinctCountFactories<Z, O>,
out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>,
) -> Stream<C, O>
pub fn dyn_stream_distinct_count_generic<A, O>( &self, factories: &StreamDistinctCountFactories<Z, O>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
Like Self::dyn_distinct_count, but can return any batch type.
Source§impl<C, D> Stream<C, D>where
C: Circuit,
D: 'static,
impl<C, D> Stream<C, D>where
C: Circuit,
D: 'static,
Sourcepub fn mark_distinct(&self) -> Self
pub fn mark_distinct(&self) -> Self
Marks the data within the current stream as distinct, meaning that all
further calls to .distinct() will have no effect.
This must only be used on streams whose integral contain elements with unit weights only, otherwise this will cause the dataflow to yield incorrect results
Sourcepub fn has_distinct_version(&self) -> bool
pub fn has_distinct_version(&self) -> bool
Returns true if a distinct version of the current stream exists
Sourcepub fn is_distinct(&self) -> bool
pub fn is_distinct(&self) -> bool
Returns true if the current stream is known to be distinct.
Sourcepub fn try_distinct_version(&self) -> Self
pub fn try_distinct_version(&self) -> Self
Returns the distinct version of the stream if it exists
Otherwise, returns self.
Sourcepub fn mark_distinct_if<C2, D2>(&self, input: &Stream<C2, D2>)where
C2: Circuit,
D2: 'static,
pub fn mark_distinct_if<C2, D2>(&self, input: &Stream<C2, D2>)where
C2: Circuit,
D2: 'static,
Marks self as distinct if input has a distinct version of itself
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_distinct_mono( &self, factories: &DistinctFactories<MonoIndexedZSet, ()>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_hash_distinct_mono( &self, factories: &HashDistinctFactories<MonoIndexedZSet, ()>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl Stream<RootCircuit, MonoZSet>
impl Stream<RootCircuit, MonoZSet>
pub fn dyn_distinct_mono( &self, factories: &DistinctFactories<MonoZSet, ()>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_hash_distinct_mono( &self, factories: &HashDistinctFactories<MonoZSet, ()>, ) -> Stream<RootCircuit, MonoZSet>
Source§impl Stream<NestedCircuit, MonoIndexedZSet>
impl Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_distinct_mono( &self, factories: &DistinctFactories<MonoIndexedZSet, <NestedCircuit as WithClock>::Time>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_hash_distinct_mono( &self, factories: &HashDistinctFactories<MonoIndexedZSet, <NestedCircuit as WithClock>::Time>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
Source§impl Stream<NestedCircuit, MonoZSet>
impl Stream<NestedCircuit, MonoZSet>
pub fn dyn_distinct_mono( &self, factories: &DistinctFactories<MonoZSet, <NestedCircuit as WithClock>::Time>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_hash_distinct_mono( &self, factories: &HashDistinctFactories<MonoZSet, <NestedCircuit as WithClock>::Time>, ) -> Stream<NestedCircuit, MonoZSet>
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Sourcepub fn dyn_stream_distinct(
&self,
input_factories: &Z::Factories,
) -> Stream<C, Z>where
Z: IndexedZSet + Send,
pub fn dyn_stream_distinct(
&self,
input_factories: &Z::Factories,
) -> Stream<C, Z>where
Z: IndexedZSet + Send,
pub fn dyn_hash_distinct( &self, factories: &HashDistinctFactories<Z, C::Time>, ) -> Stream<C, Z>
Sourcepub fn dyn_distinct(
&self,
factories: &DistinctFactories<Z, C::Time>,
) -> Stream<C, Z>where
Z: IndexedZSet + Send,
pub fn dyn_distinct(
&self,
factories: &DistinctFactories<Z, C::Time>,
) -> Stream<C, Z>where
Z: IndexedZSet + Send,
See Stream::distinct.
pub fn dyn_distinct_inner(
&self,
factories: &DistinctFactories<Z, C::Time>,
) -> Stream<C, Z>where
Z: IndexedZSet + Send,
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn((&DynData, &DynData)) -> bool>, ) -> Self
pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynUnit>)>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynData>)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl Stream<RootCircuit, MonoZSet>
impl Stream<RootCircuit, MonoZSet>
pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn(&DynData) -> bool>, ) -> Self
pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynUnit>)>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynData>)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl Stream<NestedCircuit, MonoIndexedZSet>
impl Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn((&DynData, &DynData)) -> bool>, ) -> Self
pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynUnit>)>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynData>)>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
Source§impl Stream<NestedCircuit, MonoZSet>
impl Stream<NestedCircuit, MonoZSet>
pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn(&DynData) -> bool>, ) -> Self
pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynUnit>)>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynData>)>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
Source§impl<C: Circuit, B: DynFilterMap> Stream<C, B>
impl<C: Circuit, B: DynFilterMap> Stream<C, B>
Sourcepub fn dyn_filter(
&self,
filter_func: Box<dyn Fn(B::DynItemRef<'_>) -> bool>,
) -> Self
pub fn dyn_filter( &self, filter_func: Box<dyn Fn(B::DynItemRef<'_>) -> bool>, ) -> Self
See Stream::filter.
Sourcepub fn dyn_map<K: DataTrait + ?Sized>(
&self,
output_factories: &OrdWSetFactories<K, B::R>,
map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, DynUnit>)>,
) -> Stream<C, OrdWSet<K, B::R>>
pub fn dyn_map<K: DataTrait + ?Sized>( &self, output_factories: &OrdWSetFactories<K, B::R>, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, DynUnit>)>, ) -> Stream<C, OrdWSet<K, B::R>>
See Stream::map.
Sourcepub fn dyn_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>(
&self,
output_factories: &OrdIndexedWSetFactories<K, V, B::R>,
map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, V>)>,
) -> Stream<C, OrdIndexedWSet<K, V, B::R>>
pub fn dyn_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>( &self, output_factories: &OrdIndexedWSetFactories<K, V, B::R>, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, V>)>, ) -> Stream<C, OrdIndexedWSet<K, V, B::R>>
Behaves as Self::dyn_map followed by
index, but is more efficient. Assembles
output records into VecIndexedZSet batches.
Sourcepub fn dyn_map_generic<O>(
&self,
output_factories: &O::Factories,
map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<O::Key, O::Val>)>,
) -> Stream<C, O>
pub fn dyn_map_generic<O>( &self, output_factories: &O::Factories, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<O::Key, O::Val>)>, ) -> Stream<C, O>
Like Self::dyn_map_index, but can return any batch type.
Sourcepub fn dyn_flat_map<K: DataTrait + ?Sized>(
&self,
output_factories: &OrdWSetFactories<K, B::R>,
func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut DynUnit))>,
) -> Stream<C, OrdWSet<K, B::R>>
pub fn dyn_flat_map<K: DataTrait + ?Sized>( &self, output_factories: &OrdWSetFactories<K, B::R>, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut DynUnit))>, ) -> Stream<C, OrdWSet<K, B::R>>
See Stream::flat_map.
Sourcepub fn dyn_flat_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>(
&self,
output_factories: &OrdIndexedWSetFactories<K, V, B::R>,
func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut V))>,
) -> Stream<C, OrdIndexedWSet<K, V, B::R>>
pub fn dyn_flat_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>( &self, output_factories: &OrdIndexedWSetFactories<K, V, B::R>, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut V))>, ) -> Stream<C, OrdIndexedWSet<K, V, B::R>>
Sourcepub fn dyn_flat_map_generic<O>(
&self,
output_factories: &O::Factories,
func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut O::Key, &mut O::Val))>,
) -> Stream<C, O>
pub fn dyn_flat_map_generic<O>( &self, output_factories: &O::Factories, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut O::Key, &mut O::Val))>, ) -> Stream<C, O>
Like Self::dyn_flat_map_index, but can return any batch type.
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSet + Send,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSet + Send,
Sourcepub fn dyn_lag<OV>(
&self,
persistent_id: Option<&str>,
factories: &LagFactories<B, OV>,
offset: isize,
project: Box<dyn Fn(Option<&B::Val>, &mut OV)>,
) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, DynPair<B::Val, OV>>>
pub fn dyn_lag<OV>( &self, persistent_id: Option<&str>, factories: &LagFactories<B, OV>, offset: isize, project: Box<dyn Fn(Option<&B::Val>, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, DynPair<B::Val, OV>>>
See Stream::lag.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_lag_custom_order_mono( &self, persistent_id: Option<&str>, factories: &LagCustomOrdFactories<MonoIndexedZSet, DynData, DynData, DynData>, offset: isize, encode: Box<dyn Fn(&DynData, &mut DynData)>, project: Box<dyn Fn(Option<&DynData>, &mut DynData)>, decode: Box<dyn Fn(&DynData, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<B, K, V> Stream<RootCircuit, B>
impl<B, K, V> Stream<RootCircuit, B>
Sourcepub fn dyn_lag_custom_order<V2, VL, OV>(
&self,
persistent_id: Option<&str>,
factories: &LagCustomOrdFactories<B, V2, VL, OV>,
offset: isize,
encode: Box<dyn Fn(&V, &mut V2)>,
project: Box<dyn Fn(Option<&V2>, &mut VL)>,
decode: Box<dyn Fn(&V2, &VL, &mut OV)>,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn dyn_lag_custom_order<V2, VL, OV>( &self, persistent_id: Option<&str>, factories: &LagCustomOrdFactories<B, V2, VL, OV>, offset: isize, encode: Box<dyn Fn(&V, &mut V2)>, project: Box<dyn Fn(Option<&V2>, &mut VL)>, decode: Box<dyn Fn(&V2, &VL, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSet + Send,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSet + Send,
Sourcepub fn dyn_topk_asc(
&self,
persistent_id: Option<&str>,
factories: &TopKFactories<B>,
k: usize,
) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
pub fn dyn_topk_asc( &self, persistent_id: Option<&str>, factories: &TopKFactories<B>, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
See Stream::topk_asc.
Sourcepub fn dyn_topk_desc(
&self,
persistent_id: Option<&str>,
factories: &TopKFactories<B>,
k: usize,
) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
pub fn dyn_topk_desc( &self, persistent_id: Option<&str>, factories: &TopKFactories<B>, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
See Stream::topk_desc.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_topk_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKCustomOrdFactories<DynData, DynData, DynData, DynZWeight>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, decode: Box<dyn Fn(&DynData) -> &DynData>, ) -> Self
pub fn dyn_topk_rank_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, rank_eq_func: Box<dyn Fn(&DynData, &DynData) -> bool>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_topk_dense_rank_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, rank_eq_func: Box<dyn Fn(&DynData, &DynData) -> bool>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_topk_row_number_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn dyn_topk_custom_order<V2>(
&self,
persistent_id: Option<&str>,
factories: &TopKCustomOrdFactories<K, V, V2, DynZWeight>,
k: usize,
encode: Box<dyn Fn(&V, &mut V2)>,
decode: Box<dyn Fn(&V2) -> &V>,
) -> Self
pub fn dyn_topk_custom_order<V2>( &self, persistent_id: Option<&str>, factories: &TopKCustomOrdFactories<K, V, V2, DynZWeight>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, decode: Box<dyn Fn(&V2) -> &V>, ) -> Self
Sourcepub fn dyn_topk_rank_custom_order<V2, OV>(
&self,
persistent_id: Option<&str>,
factories: &TopKRankCustomOrdFactories<K, V2, OV>,
k: usize,
encode: Box<dyn Fn(&V, &mut V2)>,
rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>,
output_func: Box<dyn Fn(i64, &V2, &mut OV)>,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn dyn_topk_rank_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Sourcepub fn dyn_topk_dense_rank_custom_order<V2, OV>(
&self,
persistent_id: Option<&str>,
factories: &TopKRankCustomOrdFactories<K, V2, OV>,
k: usize,
encode: Box<dyn Fn(&V, &mut V2)>,
rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>,
output_func: Box<dyn Fn(i64, &V2, &mut OV)>,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn dyn_topk_dense_rank_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Sourcepub fn dyn_topk_row_number_custom_order<V2, OV>(
&self,
persistent_id: Option<&str>,
factories: &TopKRankCustomOrdFactories<K, V2, OV>,
k: usize,
encode: Box<dyn Fn(&V, &mut V2)>,
output_func: Box<dyn Fn(i64, &V2, &mut OV)>,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn dyn_topk_row_number_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Source§impl<C, CI> Stream<C, CI>
impl<C, CI> Stream<C, CI>
Sourcepub fn index<K, V>(
&self,
output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories,
) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
pub fn index<K, V>( &self, output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories, ) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
Convert input batches to an indexed representation.
Converts input batches whose key type is a (key,value) tuple into an
indexed Z-set using the first element of each tuple as a key and the
second element as the value. The indexed Z-set representation is
used as input to various join and aggregation operators.
Sourcepub fn index_generic<CO>(
&self,
output_factories: &CO::Factories,
) -> Stream<C, CO>
pub fn index_generic<CO>( &self, output_factories: &CO::Factories, ) -> Stream<C, CO>
Like index, but can return any indexed Z-set type,
not just OrdIndexedZSet.
Sourcepub fn index_with<K, V, F>(
&self,
output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories,
index_func: F,
) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
pub fn index_with<K, V, F>( &self, output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories, index_func: F, ) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
Convert input batches to an indexed representation with the help of a
user-provided function that maps a key in the input Z-set into an
output (key, value) pair.
Converts input batches into an indexed Z-set by applying index_func to
each key in the input batch and using the first element of the
resulting tuple as a key and the second element as the value. The
indexed Z-set representation is used as input to join and
aggregation operators.
Sourcepub fn index_with_generic<CO, F>(
&self,
index_func: F,
output_factories: &CO::Factories,
) -> Stream<C, CO>
pub fn index_with_generic<CO, F>( &self, index_func: F, output_factories: &CO::Factories, ) -> Stream<C, CO>
Like index_with, but can return any indexed
Z-set type, not just OrdIndexedZSet.
Source§impl<K, V, U> Stream<RootCircuit, Vec<Box<DynPairs<K, dyn UpdateTrait<V, U>>>>>
impl<K, V, U> Stream<RootCircuit, Vec<Box<DynPairs<K, dyn UpdateTrait<V, U>>>>>
Sourcepub fn input_upsert<B>(
&self,
persistent_id: Option<&str>,
factories: &InputUpsertFactories<B, U>,
patch_func: PatchFunc<V, U>,
) -> Stream<RootCircuit, B>where
B: IndexedZSet<Key = K, Val = V>,
pub fn input_upsert<B>(
&self,
persistent_id: Option<&str>,
factories: &InputUpsertFactories<B, U>,
patch_func: PatchFunc<V, U>,
) -> Stream<RootCircuit, B>where
B: IndexedZSet<Key = K, Val = V>,
Convert an input stream of upserts into a stream of updates to a relation.
The input stream carries changes to a key/value map in the form of upserts. An upsert assigns a new value to a key (or deletes the key from the map) without explicitly removing the old value, if any. The operator converts upserts into batches of updates, which is the input format of most DBSP operators.
The operator assumes that the input vector is sorted by key; however,
unlike the Stream::upsert operator it allows the vector to
contain multiple updates per key. Updates are applied one by one in
order, and the output of the operator reflects cumulative effect of
the updates. Additionally, unlike the Stream::upsert operator,
which only supports inserts, which overwrite the entire value with a
new value, and deletions, this operator also supports updates that
modify the contents of a value, e.g., overwriting some of its
fields. Type argument U defines the format of modifications,
and the patch_func function applies update of type U to a value of
type V.
This is a stateful operator that internally maintains the trace of the collection.
pub fn input_upsert_with_waterline<B, W, E>(
&self,
persistent_id: Option<&str>,
factories: &InputUpsertWithWaterlineFactories<B, U, E>,
patch_func: PatchFunc<V, U>,
init_waterline: Box<dyn Fn() -> Box<W>>,
extract_ts: Box<dyn Fn(&B::Key, &B::Val, &mut W)>,
least_upper_bound: LeastUpperBoundFunc<W>,
filter_func: Box<dyn Fn(&W, &B::Key, &B::Val) -> bool>,
report_func: Box<dyn Fn(&W, &B::Key, &B::Val, ZWeight, &mut E)>,
) -> (Stream<RootCircuit, B>, Stream<RootCircuit, OrdZSet<E>>, Stream<RootCircuit, Box<W>>)where
B: IndexedZSet<Key = K, Val = V>,
W: DataTrait + Checkpoint + ?Sized,
E: DataTrait + ?Sized,
Box<W>: Checkpoint + Clone + NumEntries + Rkyv,
Source§impl<C, I1> Stream<C, I1>where
C: Circuit,
impl<C, I1> Stream<C, I1>where
C: Circuit,
Sourcepub fn dyn_stream_join<I2, V>(
&self,
factories: &StreamJoinFactories<I1, I2, OrdZSet<V>>,
other: &Stream<C, I2>,
join: JoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>,
) -> Stream<C, OrdZSet<V>>
pub fn dyn_stream_join<I2, V>( &self, factories: &StreamJoinFactories<I1, I2, OrdZSet<V>>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>, ) -> Stream<C, OrdZSet<V>>
See Stream::stream_join.
Sourcepub fn dyn_stream_join_generic<I2, Z>(
&self,
factories: &StreamJoinFactories<I1, I2, Z>,
other: &Stream<C, I2>,
join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>,
) -> Stream<C, Z>
pub fn dyn_stream_join_generic<I2, Z>( &self, factories: &StreamJoinFactories<I1, I2, Z>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<C, Z>
Like Self::dyn_stream_join, but can return any batch type.
Sourcepub fn dyn_monotonic_stream_join<I2, Z>(
&self,
factories: &StreamJoinFactories<I1, I2, Z>,
other: &Stream<C, I2>,
join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, DynUnit>,
) -> Stream<C, Z>
pub fn dyn_monotonic_stream_join<I2, Z>( &self, factories: &StreamJoinFactories<I1, I2, Z>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, DynUnit>, ) -> Stream<C, Z>
pub fn dyn_stream_antijoin<I2>( &self, factories: &StreamAntijoinFactories<I1, OrdZSet<I2::Key>>, other: &Stream<C, I2>, ) -> Self
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_join_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), OrdZSet<DynData>>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynUnit>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_join_index_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoIndexedZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynData>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_join_mono_balanced( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), OrdZSet<DynData>>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynUnit>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_join_index_mono_balanced( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoIndexedZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynData>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_antijoin_mono( &self, factories: &AntijoinFactories<MonoIndexedZSet, MonoZSet, ()>, other: &Stream<RootCircuit, MonoIndexedZSet>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl Stream<NestedCircuit, MonoIndexedZSet>
impl Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_join_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, <NestedCircuit as WithClock>::Time, OrdZSet<DynData>>, other: &Stream<NestedCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynUnit>, ) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_join_index_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, <NestedCircuit as WithClock>::Time, MonoIndexedZSet>, other: &Stream<NestedCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynData>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_antijoin_mono( &self, factories: &AntijoinFactories<MonoIndexedZSet, MonoZSet, <NestedCircuit as WithClock>::Time>, other: &Stream<NestedCircuit, MonoIndexedZSet>, ) -> Stream<NestedCircuit, MonoIndexedZSet>
Source§impl<C, I1> Stream<C, I1>where
C: Circuit,
I1: IndexedZSet,
impl<C, I1> Stream<C, I1>where
C: Circuit,
I1: IndexedZSet,
Sourcepub fn dyn_join<I2, V>(
&self,
factories: &JoinFactories<I1, I2, C::Time, OrdZSet<V>>,
other: &Stream<C, I2>,
join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, V, DynUnit>,
) -> Stream<C, OrdZSet<V>>
pub fn dyn_join<I2, V>( &self, factories: &JoinFactories<I1, I2, C::Time, OrdZSet<V>>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, V, DynUnit>, ) -> Stream<C, OrdZSet<V>>
See Stream::join.
Sourcepub fn dyn_join_index<I2, K, V>(
&self,
factories: &JoinFactories<I1, I2, C::Time, OrdIndexedZSet<K, V>>,
other: &Stream<C, I2>,
join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, K, V>,
) -> Stream<C, OrdIndexedZSet<K, V>>
pub fn dyn_join_index<I2, K, V>( &self, factories: &JoinFactories<I1, I2, C::Time, OrdIndexedZSet<K, V>>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, K, V>, ) -> Stream<C, OrdIndexedZSet<K, V>>
See Stream::join_index.
Sourcepub fn dyn_join_generic<I2, Z>(
&self,
factories: &JoinFactories<I1, I2, C::Time, Z>,
other: &Stream<C, I2>,
join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>,
) -> Stream<C, Z>
pub fn dyn_join_generic<I2, Z>( &self, factories: &JoinFactories<I1, I2, C::Time, Z>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<C, Z>
Like Self::dyn_join_index, but can return any indexed Z-set type.
Sourcepub fn dyn_antijoin<I2>(
&self,
factories: &AntijoinFactories<I1, OrdZSet<I2::Key>, C::Time>,
other: &Stream<C, I2>,
) -> Stream<C, I1>where
I2: IndexedZSet<Key = I1::Key> + DynFilterMap + Send,
Box<I1::Key>: Clone,
Box<I1::Val>: Clone,
pub fn dyn_antijoin<I2>(
&self,
factories: &AntijoinFactories<I1, OrdZSet<I2::Key>, C::Time>,
other: &Stream<C, I2>,
) -> Stream<C, I1>where
I2: IndexedZSet<Key = I1::Key> + DynFilterMap + Send,
Box<I1::Key>: Clone,
Box<I1::Val>: Clone,
See Stream::antijoin.
Source§impl<I1> Stream<RootCircuit, I1>where
I1: IndexedZSet,
impl<I1> Stream<RootCircuit, I1>where
I1: IndexedZSet,
pub fn dyn_join_generic_balanced<I2, Z>( &self, factories: &JoinFactories<I1, I2, (), Z>, other: &Stream<RootCircuit, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
Source§impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
impl<C, Z> Stream<C, Z>where
C: Circuit,
Z: IndexedZSet,
Sourcepub fn dyn_outer_join<Z2, O>(
&self,
factories: &OuterJoinFactories<Z, Z2, C::Time, O>,
other: &Stream<C, Z2>,
join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>,
left_func: Box<dyn FnMut(<Z as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>,
right_func: Box<dyn FnMut(<Z2 as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>,
) -> Stream<C, OrdZSet<O>>
pub fn dyn_outer_join<Z2, O>( &self, factories: &OuterJoinFactories<Z, Z2, C::Time, O>, other: &Stream<C, Z2>, join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>, left_func: Box<dyn FnMut(<Z as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>, right_func: Box<dyn FnMut(<Z2 as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>, ) -> Stream<C, OrdZSet<O>>
See Stream::outer_join.
Sourcepub fn dyn_outer_join_default<Z2, O>(
&self,
factories: &OuterJoinFactories<Z, Z2, C::Time, O>,
other: &Stream<C, Z2>,
join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>,
) -> Stream<C, OrdZSet<O>>where
Z: for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z as BatchReader>::Key, &'a <Z as BatchReader>::Val)> + Send,
Z2: IndexedZSet<Key = Z::Key> + Send + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z2 as BatchReader>::Key, &'a <Z2 as BatchReader>::Val)>,
O: DataTrait + ?Sized,
Box<Z::Key>: Clone,
Box<Z::Val>: Clone,
Box<Z2::Val>: Clone,
pub fn dyn_outer_join_default<Z2, O>(
&self,
factories: &OuterJoinFactories<Z, Z2, C::Time, O>,
other: &Stream<C, Z2>,
join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>,
) -> Stream<C, OrdZSet<O>>where
Z: for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z as BatchReader>::Key, &'a <Z as BatchReader>::Val)> + Send,
Z2: IndexedZSet<Key = Z::Key> + Send + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z2 as BatchReader>::Key, &'a <Z2 as BatchReader>::Val)>,
O: DataTrait + ?Sized,
Box<Z::Key>: Clone,
Box<Z::Val>: Clone,
Box<Z2::Val>: Clone,
Like Stream::dyn_outer_join, but uses default value for the missing side of the
join.
Source§impl<C, I1> Stream<C, I1>where
C: Circuit,
impl<C, I1> Stream<C, I1>where
C: Circuit,
Sourcepub fn dyn_stream_join_range<I2, V>(
&self,
factories: &StreamJoinRangeFactories<I2, OrdZSet<V>>,
other: &Stream<C, I2>,
range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>,
join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut V, &mut DynUnit))>,
) -> Stream<C, OrdZSet<V>>
pub fn dyn_stream_join_range<I2, V>( &self, factories: &StreamJoinRangeFactories<I2, OrdZSet<V>>, other: &Stream<C, I2>, range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>, join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut V, &mut DynUnit))>, ) -> Stream<C, OrdZSet<V>>
Sourcepub fn dyn_stream_join_range_index<K, V, I2>(
&self,
factories: &StreamJoinRangeFactories<I2, OrdIndexedZSet<K, V>>,
other: &Stream<C, I2>,
range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>,
join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut K, &mut V))>,
) -> Stream<C, OrdIndexedZSet<K, V>>where
I1: IndexedZSetReader + Clone,
I2: IndexedZSetReader + Clone,
K: DataTrait + ?Sized,
V: DataTrait + ?Sized,
pub fn dyn_stream_join_range_index<K, V, I2>(
&self,
factories: &StreamJoinRangeFactories<I2, OrdIndexedZSet<K, V>>,
other: &Stream<C, I2>,
range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>,
join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut K, &mut V))>,
) -> Stream<C, OrdIndexedZSet<K, V>>where
I1: IndexedZSetReader + Clone,
I2: IndexedZSetReader + Clone,
K: DataTrait + ?Sized,
V: DataTrait + ?Sized,
Sourcepub fn dyn_stream_join_range_generic<I2, O>(
&self,
factories: &StreamJoinRangeFactories<I2, O>,
other: &Stream<C, I2>,
range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>,
join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut O::Key, &mut O::Val))>,
) -> Stream<C, O>
pub fn dyn_stream_join_range_generic<I2, O>( &self, factories: &StreamJoinRangeFactories<I2, O>, other: &Stream<C, I2>, range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>, join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut O::Key, &mut O::Val))>, ) -> Stream<C, O>
Like Self::dyn_stream_join_range, but can return any indexed Z-set
type.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
Sourcepub fn dyn_star_join_index_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, ()>,
others: &[(Stream<RootCircuit, MonoIndexedZSet>, bool)],
join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>],
) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_star_join_index_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, ()>, others: &[(Stream<RootCircuit, MonoIndexedZSet>, bool)], join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>], ) -> Stream<RootCircuit, MonoIndexedZSet>
See define_star_join_index!.
Sourcepub fn dyn_star_join_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, ()>,
others: &[(Stream<RootCircuit, MonoIndexedZSet>, bool)],
join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>],
) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_star_join_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, ()>, others: &[(Stream<RootCircuit, MonoIndexedZSet>, bool)], join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>], ) -> Stream<RootCircuit, MonoZSet>
See define_star_join!.
Sourcepub fn dyn_inner_star_join_index_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, ()>,
others: &[Stream<RootCircuit, MonoIndexedZSet>],
join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>],
) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_inner_star_join_index_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, ()>, others: &[Stream<RootCircuit, MonoIndexedZSet>], join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynData>], ) -> Stream<RootCircuit, MonoIndexedZSet>
See define_inner_star_join_index!.
Sourcepub fn dyn_inner_star_join_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, ()>,
others: &[Stream<RootCircuit, MonoIndexedZSet>],
join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>],
) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_inner_star_join_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, ()>, others: &[Stream<RootCircuit, MonoIndexedZSet>], join_funcs: &[StarJoinFunc<RootCircuit, MonoIndexedZSet, DynData, DynUnit>], ) -> Stream<RootCircuit, MonoZSet>
See define_inner_star_join!.
Source§impl Stream<NestedCircuit, MonoIndexedZSet>
impl Stream<NestedCircuit, MonoIndexedZSet>
Sourcepub fn dyn_inner_star_join_index_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, <NestedCircuit as WithClock>::Time>,
others: &[Stream<NestedCircuit, MonoIndexedZSet>],
join_funcs: &[StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynData>],
) -> Stream<NestedCircuit, MonoIndexedZSet>
pub fn dyn_inner_star_join_index_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoIndexedZSet, <NestedCircuit as WithClock>::Time>, others: &[Stream<NestedCircuit, MonoIndexedZSet>], join_funcs: &[StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynData>], ) -> Stream<NestedCircuit, MonoIndexedZSet>
See define_inner_star_join_index!.
Sourcepub fn dyn_inner_star_join_mono(
&self,
factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, <NestedCircuit as WithClock>::Time>,
others: &[Stream<NestedCircuit, MonoIndexedZSet>],
join_funcs: &[StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynUnit>],
) -> Stream<NestedCircuit, MonoZSet>
pub fn dyn_inner_star_join_mono( &self, factories: &StarJoinFactories<MonoIndexedZSet, MonoZSet, <NestedCircuit as WithClock>::Time>, others: &[Stream<NestedCircuit, MonoIndexedZSet>], join_funcs: &[StarJoinFunc<NestedCircuit, MonoIndexedZSet, DynData, DynUnit>], ) -> Stream<NestedCircuit, MonoZSet>
See define_inner_star_join!.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
pub fn dyn_star_join<O>(
&self,
factories: &StarJoinFactories<OrdIndexedZSet<K, V>, O, ()>,
others: &[(Self, bool)],
join_funcs: &[StarJoinFunc<RootCircuit, OrdIndexedZSet<K, V>, O::Key, O::Val>],
) -> Stream<RootCircuit, O>where
O: IndexedZSet,
Source§impl<C, K, V> Stream<C, OrdIndexedZSet<K, V>>
impl<C, K, V> Stream<C, OrdIndexedZSet<K, V>>
pub fn dyn_inner_star_join<O>(
&self,
factories: &StarJoinFactories<OrdIndexedZSet<K, V>, O, C::Time>,
others: &[Self],
join_funcs: &[StarJoinFunc<C, OrdIndexedZSet<K, V>, O::Key, O::Val>],
) -> Stream<C, O>where
O: IndexedZSet,
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
Sourcepub fn dyn_neighborhood(
&self,
factories: &NeighborhoodFactories<B>,
neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>,
) -> NeighborhoodStream<B::Key, B::Val>
pub fn dyn_neighborhood( &self, factories: &NeighborhoodFactories<B>, neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>, ) -> NeighborhoodStream<B::Key, B::Val>
Returns a small contiguous range of rows (DynNeighborhood) of the input
table.
This operator helps to visualize the contents of the input table in a
UI. The UI client may not have enough throughput/memory to store the
entire table, and will instead limit its state to a small range of
rows that fit on the screen. We specify such a range, or
neighborhood, in terms of its center (or “anchor”), and the number
of rows preceding and following the anchor (see
NeighborhoodDescr). The user may be interested in a static
snapshot of the neighborhood or in a changing view. Both modes are
supported by this operator (see the reset argument). The output of
the operator is a stream of DynNeighborhoods.
NOTE: This operator assumes that the integral of the input stream does not contain negative weights (which should normally be the case) and may produce incorrect outputs otherwise.
§Arguments
-
self- a stream of changes to an indexed Z-set. -
neighborhood_descr- contains the neighborhood descriptor to evaluate at every clock tick. Set toNoneto disable the operator (it will output an empty neighborhood).
§Output
Outputs a stream of changes to the neighborhood.
The output neighborhood will contain rows with indexes between
-descr.before and descr.after - 1. Row 0 is the anchor row, i.e.,
is the first row in the input stream greater than or equal to
descr.anchor. If there is no such row (i.e., all rows in the input
stream are smaller than the anchor), then the neighborhood will only
contain negative indexes.
The first index in the neighborhood may be greater
than -descr.before if the input stream doesn’t contain enough rows
preceding the specified anchor. The last index may be smaller than
descr.after - 1 if the input stream doesn’t contain descr.after
rows following the anchor point.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_left_join_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynUnit>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_left_join_index_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoIndexedZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynData>, ) -> Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_left_join_balanced_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynUnit>, ) -> Stream<RootCircuit, MonoZSet>
pub fn dyn_left_join_balanced_index_mono( &self, factories: &JoinFactories<MonoIndexedZSet, MonoIndexedZSet, (), MonoIndexedZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, join_funcs: TraceJoinFuncs<DynData, DynData, DynData, DynData, DynData>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
Sourcepub fn dyn_left_join<V2, Z>(
&self,
factories: &JoinFactories<OrdIndexedZSet<K, V1>, OrdIndexedZSet<K, V2>, (), Z>,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join_funcs: TraceJoinFuncs<K, V1, V2, Z::Key, Z::Val>,
) -> Stream<RootCircuit, Z>
pub fn dyn_left_join<V2, Z>( &self, factories: &JoinFactories<OrdIndexedZSet<K, V1>, OrdIndexedZSet<K, V2>, (), Z>, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join_funcs: TraceJoinFuncs<K, V1, V2, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
Left-join self with other.
For any key on the left that is not present on the right, passes the default
value of V2 to the join function. This assumes that the underlying concrete
type if Option<T> and that the other stream does not contain any None values.
Source§impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
pub fn dyn_left_join_balanced<V2, Z>( &self, factories: &JoinFactories<OrdIndexedZSet<K, V1>, OrdIndexedZSet<K, V2>, (), Z>, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join_funcs: TraceJoinFuncs<K, V1, V2, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSetReader + Clone,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSetReader + Clone,
Sourcepub fn dyn_stream_sample_keys(
&self,
output_factories: &VecZSetFactories<B::Key>,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, VecZSet<B::Key>>
pub fn dyn_stream_sample_keys( &self, output_factories: &VecZSetFactories<B::Key>, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, VecZSet<B::Key>>
Sourcepub fn dyn_stream_sample_unique_key_vals(
&self,
factories: &StreamSampleUniqueKeyValsFactories<B>,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, VecZSet<DynPair<B::Key, B::Val>>>
pub fn dyn_stream_sample_unique_key_vals( &self, factories: &StreamSampleUniqueKeyValsFactories<B>, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, VecZSet<DynPair<B::Key, B::Val>>>
Sourcepub fn dyn_stream_key_quantiles(
&self,
output_factories: &VecZSetFactories<B::Key>,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, VecZSet<B::Key>>
pub fn dyn_stream_key_quantiles( &self, output_factories: &VecZSetFactories<B::Key>, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, VecZSet<B::Key>>
Sourcepub fn dyn_stream_unique_key_val_quantiles(
&self,
factories: &StreamSampleUniqueKeyValsFactories<B>,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, VecZSet<DynPair<B::Key, B::Val>>>
pub fn dyn_stream_unique_key_val_quantiles( &self, factories: &StreamSampleUniqueKeyValsFactories<B>, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, VecZSet<DynPair<B::Key, B::Val>>>
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn dyn_saturate(
&self,
factories: &<OrdIndexedZSet<K, V> as BatchReader>::Factories,
) -> Stream<RootCircuit, Option<SpineSnapshot<OrdIndexedZSet<K, V>>>>
pub fn dyn_saturate( &self, factories: &<OrdIndexedZSet<K, V> as BatchReader>::Factories, ) -> Stream<RootCircuit, Option<SpineSnapshot<OrdIndexedZSet<K, V>>>>
Saturate the input stream by adding a ghost (k, None) tuple for each key not present in the trace of the input stream.
This is an auxiliary operator used to implement incremental outer joins. The idea is to convert, e.g., a left join into an inner join by simulating that every key in the right side of the join is always present. We do this without materializing the entire universe of keys by:
- Providing a modified cursor over the integral of the
right side, which returns missing keys on demand (see
SaturatingCursor). - Augmenting the change stream with the missing records (this operator), specifically:
- When the first value for a key is added to the collection, we inject a retraction of the ghost tuple ((k, None), -1).
- When the last value for a key is removed from the collection, we inject an addition of the ghost tuple ((k, None), +1).
Caveat: In order to faithfully implement the saturated stream, we’d have to output ghost tuples for all missing keys during the first step. We don’t do this, relying on the fact that the left join essentially ignores the first delta in the right stream (by joining with the empty delayed integral of the left side, which is always empty in the first step, since it’s the output of a delay operator).
pub fn dyn_saturate_balanced( &self, factories: &<OrdIndexedZSet<K, V> as BatchReader>::Factories, ) -> Stream<RootCircuit, Option<SpineSnapshot<OrdIndexedZSet<K, V>>>>
Source§impl<C, Pairs> Stream<C, Pairs>where
C: Circuit,
impl<C, Pairs> Stream<C, Pairs>where
C: Circuit,
Sourcepub fn dyn_semijoin_stream<Keys, Out>(
&self,
factories: &SemijoinStreamFactories<Pairs, Keys, Out>,
keys: &Stream<C, Keys>,
) -> Stream<C, Out>
pub fn dyn_semijoin_stream<Keys, Out>( &self, factories: &SemijoinStreamFactories<Pairs, Keys, Out>, keys: &Stream<C, Keys>, ) -> Stream<C, Out>
Source§impl<Z> Stream<RootCircuit, Z>where
Z: Clone + 'static,
impl<Z> Stream<RootCircuit, Z>where
Z: Clone + 'static,
Sourcepub fn partitioned_tree_aggregate<TS, V, Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedTreeAggregateFactories<TS, V, Z, OrdIndexedZSet<<Z as BatchReader>::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>, Acc>,
aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>,
) -> Stream<RootCircuit, OrdIndexedZSet<Z::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>>
pub fn partitioned_tree_aggregate<TS, V, Acc, Out>( &self, persistent_id: Option<&str>, factories: &PartitionedTreeAggregateFactories<TS, V, Z, OrdIndexedZSet<<Z as BatchReader>::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>, Acc>, aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<RootCircuit, OrdIndexedZSet<Z::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>>
Given a batch of updates to a partitioned time series stream, computes a stream of updates to its partitioned radix tree.
This is a building block for higher-level operators such as
Stream::partitioned_rolling_aggregate.
Sourcepub fn partitioned_tree_aggregate_generic<TS, V, Acc, Out, O>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedTreeAggregateFactories<TS, V, Z, O, Acc>,
aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>,
) -> Stream<RootCircuit, O>
pub fn partitioned_tree_aggregate_generic<TS, V, Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &PartitionedTreeAggregateFactories<TS, V, Z, O, Acc>, aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<RootCircuit, O>
Like Self::partitioned_tree_aggregate, but can return any
partitioned batch type.
This is a building block for higher-level operators such as
Stream::partitioned_rolling_aggregate.
Source§impl<C, Z, TS> Stream<C, Z>
impl<C, Z, TS> Stream<C, Z>
Sourcepub fn tree_aggregate<Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &TreeAggregateFactories<TS, Z, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>, Acc>,
aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>,
) -> Stream<C, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>
pub fn tree_aggregate<Acc, Out>( &self, persistent_id: Option<&str>, factories: &TreeAggregateFactories<TS, Z, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>, Acc>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>
Given a batch of updates to a time series stream, computes a stream of updates to its radix tree.
This is intended as a building block for higher-level operators.
§Limitations
Unlike Stream::partitioned_tree_aggregate(), this operator is
currently not parallelized, performing all work in a single worker
thread.
Sourcepub fn tree_aggregate_generic<Acc, Out, O>(
&self,
persistent_id: Option<&str>,
factories: &TreeAggregateFactories<TS, Z, O, Acc>,
aggregator: &dyn DynAggregator<Z::Val, (), DynZWeight, Accumulator = Acc, Output = Out>,
) -> Stream<C, O>
pub fn tree_aggregate_generic<Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &TreeAggregateFactories<TS, Z, O, Acc>, aggregator: &dyn DynAggregator<Z::Val, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
Like Self::tree_aggregate, but can return any batch type.
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
Sourcepub fn dyn_partitioned_rolling_aggregate_with_waterline<PK, TS, V, Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateWithWaterlineFactories<PK, TS, V, Acc, Out, B>,
waterline: &Stream<RootCircuit, Box<DynDataTyped<TS>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
Box<B::Key>: Clone,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
Acc: DataTrait + ?Sized,
Out: DataTrait + ?Sized,
pub fn dyn_partitioned_rolling_aggregate_with_waterline<PK, TS, V, Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateWithWaterlineFactories<PK, TS, V, Acc, Out, B>,
waterline: &Stream<RootCircuit, Box<DynDataTyped<TS>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
Box<B::Key>: Clone,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
Acc: DataTrait + ?Sized,
Out: DataTrait + ?Sized,
Sourcepub fn dyn_partitioned_rolling_aggregate<PK, TS, V, Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateFactories<TS, V, Acc, Out, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<Out>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
Acc: DataTrait + ?Sized,
Out: DataTrait + ?Sized,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
pub fn dyn_partitioned_rolling_aggregate<PK, TS, V, Acc, Out>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateFactories<TS, V, Acc, Out, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<Out>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
Acc: DataTrait + ?Sized,
Out: DataTrait + ?Sized,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
Like Self::dyn_partitioned_rolling_aggregate, but can return any
batch type.
Sourcepub fn dyn_partitioned_rolling_aggregate_linear<PK, TS, V, A, O>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateLinearFactories<TS, V, O, A, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<O>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
f: Box<dyn WeighFunc<V, B::R, A>>,
output_func: Box<dyn AggOutputFunc<A, O>>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, O>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
A: WeightTrait + ?Sized,
O: DataTrait + ?Sized,
pub fn dyn_partitioned_rolling_aggregate_linear<PK, TS, V, A, O>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAggregateLinearFactories<TS, V, O, A, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<O>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
f: Box<dyn WeighFunc<V, B::R, A>>,
output_func: Box<dyn AggOutputFunc<A, O>>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, O>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
A: WeightTrait + ?Sized,
O: DataTrait + ?Sized,
pub fn dyn_partitioned_rolling_average<PK, TS, V, W>(
&self,
persistent_id: Option<&str>,
factories: &PartitionedRollingAverageFactories<TS, V, W, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<V>>>,
partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>,
f: Box<dyn WeighFunc<V, B::R, W>>,
out_func: Box<dyn AggOutputFunc<W, V>>,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, V>where
B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>,
PK: DataTrait + ?Sized,
TS: DBData + UnsignedPrimInt + Erase<B::Key>,
V: DataTrait + ?Sized,
W: WeightTrait + ?Sized,
Source§impl Stream<RootCircuit, MonoZSet>
impl Stream<RootCircuit, MonoZSet>
Source§impl<B> Stream<RootCircuit, B>where
B: BatchReader + Clone + 'static,
impl<B> Stream<RootCircuit, B>where
B: BatchReader + Clone + 'static,
Source§impl<B> Stream<RootCircuit, B>where
B: BatchReader + Clone + 'static,
impl<B> Stream<RootCircuit, B>where
B: BatchReader + Clone + 'static,
Sourcepub fn dyn_waterline<TS>(
&self,
persistent_id: Option<&str>,
init: Box<dyn Fn() -> Box<TS>>,
extract_ts: Box<dyn Fn(&B::Key, &B::Val, &mut TS)>,
least_upper_bound: LeastUpperBoundFunc<TS>,
) -> Stream<RootCircuit, Box<TS>>
pub fn dyn_waterline<TS>( &self, persistent_id: Option<&str>, init: Box<dyn Fn() -> Box<TS>>, extract_ts: Box<dyn Fn(&B::Key, &B::Val, &mut TS)>, least_upper_bound: LeastUpperBoundFunc<TS>, ) -> Stream<RootCircuit, Box<TS>>
See Stream::waterline.
Source§impl Stream<RootCircuit, MonoIndexedZSet>
impl Stream<RootCircuit, MonoIndexedZSet>
pub fn dyn_window_mono( &self, factories: &<MonoIndexedZSet as BatchReader>::Factories, inclusive: (bool, bool), bounds: &Stream<RootCircuit, (Box<DynData>, Box<DynData>)>, ) -> Stream<RootCircuit, MonoIndexedZSet>
Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn dyn_trace(
&self,
output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
batch_factories: &B::Factories,
) -> Stream<C, TimedSpine<B, C>>
pub fn dyn_trace( &self, output_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> Stream<C, TimedSpine<B, C>>
See Stream::trace.
Sourcepub fn dyn_trace_with_bound(
&self,
output_factories: &<TimedSpine<B, C> as BatchReader>::Factories,
batch_factories: &B::Factories,
lower_key_bound: TraceBound<B::Key>,
lower_val_bound: TraceBound<B::Val>,
) -> Stream<C, TimedSpine<B, C>>
pub fn dyn_trace_with_bound( &self, output_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TimedSpine<B, C>>
Sourcepub fn dyn_integrate_trace_retain_keys<TS>(
&self,
bounds_stream: &Stream<C, Box<TS>>,
retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>,
)
pub fn dyn_integrate_trace_retain_keys<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>, )
Sourcepub fn dyn_integrate_trace_retain_values<TS>(
&self,
bounds_stream: &Stream<C, Box<TS>>,
retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>,
)
pub fn dyn_integrate_trace_retain_values<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, )
pub fn dyn_integrate_trace_retain_values_last_n<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
pub fn dyn_integrate_trace_retain_values_top_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
pub fn dyn_integrate_trace( &self, factories: &B::Factories, ) -> Stream<C, Spine<B>>
pub fn dyn_integrate_trace_with_bound( &self, factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<B>>
Source§impl<C, B> Stream<C, Spine<B>>
impl<C, B> Stream<C, Spine<B>>
pub fn delay_trace(&self) -> Stream<C, SpineSnapshot<B>>
Source§impl<C, K> Stream<C, Box<DynPairs<K, DynOpt<DynUnit>>>>
impl<C, K> Stream<C, Box<DynPairs<K, DynOpt<DynUnit>>>>
Sourcepub fn update_set<B>(
&self,
persistent_id: Option<&str>,
factories: &UpdateSetFactories<<C as WithClock>::Time, B>,
) -> Stream<C, B>where
B: ZSet<Key = K>,
pub fn update_set<B>(
&self,
persistent_id: Option<&str>,
factories: &UpdateSetFactories<<C as WithClock>::Time, B>,
) -> Stream<C, B>where
B: ZSet<Key = K>,
Convert a stream of inserts and deletes into a stream of Z-set updates.
The input stream carries changes to a set in the form of insert and delete commands. The set semantics implies that inserting an element that already exists in the set is a no-op. Likewise, deleting an element that is not in the set is a no-op. This operator converts these commands into batches of updates to a Z-set, which is the input format of most DBSP operators.
The operator assumes that the input vector is sorted by key.
This is a stateful operator that internally maintains the trace of the collection.
Source§impl<C, K, V> Stream<C, Box<DynPairs<K, DynOpt<V>>>>
impl<C, K, V> Stream<C, Box<DynPairs<K, DynOpt<V>>>>
Sourcepub fn upsert<B>(
&self,
persistent_id: Option<&str>,
factories: &UpsertFactories<<C as WithClock>::Time, B>,
) -> Stream<C, B>where
B: IndexedZSet<Key = K, Val = V>,
pub fn upsert<B>(
&self,
persistent_id: Option<&str>,
factories: &UpsertFactories<<C as WithClock>::Time, B>,
) -> Stream<C, B>where
B: IndexedZSet<Key = K, Val = V>,
Convert a stream of upserts into a stream of updates.
The input stream carries changes to a key/value map in the form of
upserts. An upsert assigns a new value to a key (or None to
remove the key from the map) without explicitly removing the old
value, if any. Upserts are produced by some operators, including
Stream::aggregate. The operator converts upserts
into batches of updates, which is the input format of most DBSP
operators.
The operator assumes that the input vector is sorted by key and contains exactly one value per key.
This is a stateful operator that internally maintains the trace of the collection.
Source§impl<B> Stream<RootCircuit, B>
impl<B> Stream<RootCircuit, B>
Sourcepub fn lag<VL, PF>(
&self,
offset: isize,
project: PF,
) -> Stream<RootCircuit, TypedBatch<B::Key, Tup2<B::Val, VL>, ZWeight, DynOrdIndexedZSet<B::DynK, DynPair<B::DynV, DynData>>>>
pub fn lag<VL, PF>( &self, offset: isize, project: PF, ) -> Stream<RootCircuit, TypedBatch<B::Key, Tup2<B::Val, VL>, ZWeight, DynOrdIndexedZSet<B::DynK, DynPair<B::DynV, DynData>>>>
Lag operator matches each row in a group with a value at the given offset in the same group.
For each key in the input stream, it matches each associated value with
another value in the same group with a smaller (offset > 0) or greater
(offset < 0) index according to ascending order of values, applies
projection function project to it and outputs the input value along
with this projection.
The offset it computed as if each value occurred as many times as its weight. Values with negative weights are ignored; hence the output Z-set will only contain positive weights.
§Arguments
offset- offset to the previous value.project- projection function to apply to the delayed row.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn lag_custom_order<VL, OV, PF, CF, OF>(
&self,
offset: isize,
project: PF,
output: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn lag_custom_order<VL, OV, PF, CF, OF>( &self, offset: isize, project: PF, output: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Like Stream::lag, but uses a custom ordering of values within the group
defined by the comparison function CF.
§Arguments
offset- offset to the previous or next value.project- projection function to apply to the delayed row.output- output function that constructs the output value from the value of the current row and the projection of the delayed row.
pub fn lag_custom_order_persistent<VL, OV, PF, CF, OF>( &self, persistent_id: Option<&str>, offset: isize, project: PF, output: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Source§impl<B> Stream<RootCircuit, B>
impl<B> Stream<RootCircuit, B>
Sourcepub fn topk_asc(
&self,
k: usize,
) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
pub fn topk_asc( &self, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
Pick k smallest values in each group.
For each key in the input stream, removes all but k smallest values.
Sourcepub fn topk_desc(
&self,
k: usize,
) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
pub fn topk_desc( &self, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>
Pick k largest values in each group.
For each key in the input stream, removes all but k largest values.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn topk_custom_order<F>(&self, k: usize) -> Selfwhere
F: CmpFunc<V>,
pub fn topk_custom_order<F>(&self, k: usize) -> Selfwhere
F: CmpFunc<V>,
Pick k smallest values in each group based on a custom comparison
function.
This method is similar to topk_asc, but instead
of ordering elements according to trait Ord for V, it uses a
user-defined comparison function F.
§Correctness
CFmust establish a total order overV, consistent withimpl Eq for V, i.e.,CF::cmp(v1, v2) == Equal <=> v1.eq(v2).
pub fn topk_custom_order_persistent<F>(
&self,
persistent_id: Option<&str>,
k: usize,
) -> Selfwhere
F: CmpFunc<V>,
Sourcepub fn topk_rank_custom_order<CF, EF, OF, OV>(
&self,
k: usize,
rank_eq_func: EF,
output_func: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn topk_rank_custom_order<CF, EF, OF, OV>( &self, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Rank elements in the group and output all elements with rank <= k.
This operator implements the behavior of the following SQL pattern:
SELECT
...,
RANK() OVER (PARTITION BY .. ORDER BY ...) AS rank
FROM table
WHERE rank <= KThe CF type and the rank_eq_func function together establish the
ranking of values in the group:
CFestablishes a total ordering of elements such thatv1 < v2 => rank(v1) <= rank(v2).rank_eq_funcchecks that two elements have equal rank, i.e., have equal values in all the ORDER BY columns in the SQL query above:rank_eq_func(v1, v2) <=> rank(v1) == rank(v2).
The output_func closure takes a value and its rank and produces an
output value.
§Correctness
CFmust establish a total order overV, consistent withimpl Eq for V, i.e.,CF::cmp(v1, v2) == Equal <=> v1.eq(v2).CFmust be consistent withrank_eq_func, i.e.,CF::cmp(v1, v2) == Equal => rank_eq_func(v1, v2).
pub fn topk_rank_custom_order_persistent<CF, EF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Sourcepub fn topk_dense_rank_custom_order<CF, EF, OF, OV>(
&self,
k: usize,
rank_eq_func: EF,
output_func: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn topk_dense_rank_custom_order<CF, EF, OF, OV>( &self, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Rank elements in the group using dense ranking and output all elements
with rank <= k.
Similar to topk_rank_custom_order,
but computes a dense ranking of elements in the group.
pub fn topk_dense_rank_custom_order_persistent<CF, EF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Sourcepub fn topk_row_number_custom_order<CF, OF, OV>(
&self,
k: usize,
output_func: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn topk_row_number_custom_order<CF, OF, OV>( &self, k: usize, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Pick k smallest values in each group based on a custom comparison
function. Return the k elements along with their row numbers.
This operator implements the behavior of the following SQL pattern:
SELECT
...,
ROW_NUMBER() OVER (PARTITION BY .. ORDER BY ...) AS row_number
FROM table
WHERE row_number <= K§Correctness
CFmust establish a total order overV, consistent withimpl Eq for V, i.e.,CF::cmp(v1, v2) == Equal <=> v1.eq(v2).
pub fn topk_row_number_custom_order_persistent<CF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
Source§impl<C, K1, V1> Stream<C, OrdIndexedZSet<K1, V1>>
impl<C, K1, V1> Stream<C, OrdIndexedZSet<K1, V1>>
Sourcepub fn outer_join_default<F, V2, O>(
&self,
other: &Stream<C, OrdIndexedZSet<K1, V2>>,
join_func: F,
) -> Stream<C, OrdZSet<O>>
pub fn outer_join_default<F, V2, O>( &self, other: &Stream<C, OrdIndexedZSet<K1, V2>>, join_func: F, ) -> Stream<C, OrdZSet<O>>
Like Stream::outer_join, but uses default value for the missing side of the
join.
Source§impl<C, I1> Stream<C, I1>
impl<C, I1> Stream<C, I1>
Sourcepub fn stream_join<F, I2, V>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, OrdZSet<V>>
pub fn stream_join<F, I2, V>( &self, other: &Stream<C, I2>, join: F, ) -> Stream<C, OrdZSet<V>>
Join two streams of batches.
The operator takes two streams of batches indexed with the same key type
(I1::Key = I2::Key) and outputs a stream obtained by joining each pair
of inputs.
Input streams will typically be produced by the
map_index operator.
§Type arguments
F- join function type: maps key and a pair of values from input batches to an output value.I1- batch type in the first input stream.I2- batch type in the second input stream.V- output value type.
Sourcepub fn stream_join_generic<F, I2, Z>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, Z>where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: IndexedZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> (Z::Key, Z::Val) + Clone + 'static,
pub fn stream_join_generic<F, I2, Z>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, Z>where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: IndexedZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> (Z::Key, Z::Val) + Clone + 'static,
Like Self::stream_join, but can return any batch type.
Sourcepub fn monotonic_stream_join<F, I2, Z>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, Z>where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: ZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> Z::Key + Clone + 'static,
pub fn monotonic_stream_join<F, I2, Z>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, Z>where
I1: IndexedZSet,
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: ZSet,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> Z::Key + Clone + 'static,
More efficient than Self::stream_join, but the output of the join
function must grow monotonically as (k, v1, v2) tuples are fed to it
in lexicographic order.
One such monotonic function is a join function that returns (k, v1, v2) itself.
Sourcepub fn stream_antijoin<I2>(&self, other: &Stream<C, I2>) -> Stream<C, I1>
pub fn stream_antijoin<I2>(&self, other: &Stream<C, I2>) -> Stream<C, I1>
Non-incremental antijoin operator.
pub fn join_generic<I2, F, Z, It>(
&self,
other: &Stream<C, I2>,
join: F,
) -> Stream<C, Z>where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I2::InnerBatch: Send,
Z: IndexedZSet,
Box<Z::DynK>: Clone,
Box<Z::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> It + Clone + 'static,
It: IntoIterator<Item = (Z::Key, Z::Val)> + 'static,
Sourcepub fn outer_join<I2, F, FL, FR, O>(
&self,
other: &Stream<C, I2>,
join_func: F,
left_func: FL,
right_func: FR,
) -> Stream<C, OrdZSet<O>>where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I1::Inner: DynFilterMap,
I2::Inner: DynFilterMap,
I2::InnerBatch: Send,
O: DBData,
Box<I1::DynK>: Clone,
Box<I1::DynV>: Clone,
Box<I2::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> O + Clone + 'static,
for<'a> FL: Fn(&I1::Key, &I1::Val) -> O + Clone + 'static,
for<'a> FR: Fn(&I2::Key, &I2::Val) -> O + Clone + 'static,
pub fn outer_join<I2, F, FL, FR, O>(
&self,
other: &Stream<C, I2>,
join_func: F,
left_func: FL,
right_func: FR,
) -> Stream<C, OrdZSet<O>>where
I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>,
I1::Inner: DynFilterMap,
I2::Inner: DynFilterMap,
I2::InnerBatch: Send,
O: DBData,
Box<I1::DynK>: Clone,
Box<I1::DynV>: Clone,
Box<I2::DynV>: Clone,
F: Fn(&I1::Key, &I1::Val, &I2::Val) -> O + Clone + 'static,
for<'a> FL: Fn(&I1::Key, &I1::Val) -> O + Clone + 'static,
for<'a> FR: Fn(&I2::Key, &I2::Val) -> O + Clone + 'static,
Outer join:
- returns the output of
join_funcfor common keys. - returns the output of
left_funcfor keys only found inself, but notother. - returns the output of
right_funcfor keys only found inother, but notself.
Source§impl<C, I1> Stream<C, I1>
impl<C, I1> Stream<C, I1>
Sourcepub fn stream_join_range<I2, RF, JF, It>(
&self,
other: &Stream<C, I2>,
range_func: RF,
join_func: JF,
) -> Stream<C, OrdZSet<It::Item>>
pub fn stream_join_range<I2, RF, JF, It>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, OrdZSet<It::Item>>
Range-join two streams into an OrdZSet.
This operator is non-incremental, i.e., it joins the pair of batches it receives at each timestamp ignoring previous inputs.
Sourcepub fn stream_join_range_index<I2, K, V, RF, JF, It>(
&self,
other: &Stream<C, I2>,
range_func: RF,
join_func: JF,
) -> Stream<C, OrdIndexedZSet<K, V>>
pub fn stream_join_range_index<I2, K, V, RF, JF, It>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, OrdIndexedZSet<K, V>>
Range-join two streams into an OrdIndexedZSet.
See module documentation for the definition of the range-join operator and its arguments.
In this version of the operator, the join_func closure returns
an iterator over (key, value) pairs used to assemble the output
indexed Z-set.
This operator is non-incremental, i.e., it joins the pair of batches it receives at each timestamp ignoring previous inputs.
Sourcepub fn stream_join_range_generic<I2, K, V, RF, JF, It, O>(
&self,
other: &Stream<C, I2>,
range_func: RF,
join_func: JF,
) -> Stream<C, O>
pub fn stream_join_range_generic<I2, K, V, RF, JF, It, O>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, O>
Like Self::dyn_stream_join_range, but can return any indexed Z-set
type.
Source§impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
impl<B> Stream<RootCircuit, B>where
B: IndexedZSet,
Sourcepub fn neighborhood(
&self,
neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>,
) -> NeighborhoodStream<B>
pub fn neighborhood( &self, neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>, ) -> NeighborhoodStream<B>
Returns a small contiguous range of rows (Neighborhood) of the input
table.
This operator helps to visualize the contents of the input table in a
UI. The UI client may not have enough throughput/memory to store the
entire table, and will instead limit its state to a small range of
rows that fit on the screen. We specify such a range, or
neighborhood, in terms of its center (or “anchor”), and the number
of rows preceding and following the anchor (see
NeighborhoodDescr). The user may be interested in a static
snapshot of the neighborhood or in a changing view. Both modes are
supported by this operator (see the reset argument). The output of
the operator is a stream of Neighborhoods.
NOTE: This operator assumes that the integral of the input stream does not contain negative weights (which should normally be the case) and may produce incorrect outputs otherwise.
§Arguments
-
self- a stream of changes to an indexed Z-set. -
neighborhood_descr- contains the neighborhood descriptor to evaluate at every clock tick. Set toNoneto disable the operator (it will output an empty neighborhood).
§Output
Outputs a stream of changes to the neighborhood.
The output neighborhood will contain rows with indexes between
-descr.before and descr.after - 1. Row 0 is the anchor row, i.e.,
is the first row in the input stream greater than or equal to
descr.anchor. If there is no such row (i.e., all rows in the input
stream are smaller than the anchor), then the neighborhood will only
contain negative indexes.
The first index in the neighborhood may be greater
than -descr.before if the input stream doesn’t contain enough rows
preceding the specified anchor. The last index may be smaller than
descr.after - 1 if the input stream doesn’t contain descr.after
rows following the anchor point.
Source§impl<C, D> Stream<C, D>
impl<C, D> Stream<C, D>
Sourcepub fn delta0_non_iterative<CC>(&self, subcircuit: &CC) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
pub fn delta0_non_iterative<CC>(&self, subcircuit: &CC) -> Stream<CC, D>where
CC: Circuit<Parent = C>,
Import self from the parent circuit to subcircuit via the Delta0NonIterative
operator.
Source§impl<B> Stream<RootCircuit, B>
impl<B> Stream<RootCircuit, B>
Sourcepub fn stream_sample_keys(
&self,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>
pub fn stream_sample_keys( &self, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>
Generates a uniform random sample of keys from self.
At every clock tick, computes a random sample of the input batch
using BatchReader::sample_keys.
The sample_size stream
specifies the size of the sample to compute (use 0 when no sample
is needed at the current clock cycle to make sure the operator
doesn’t waste CPU cycles).
Maximal supported sample size is MAX_SAMPLE_SIZE. If the operator
receives a larger sample_size value, it treats it as
MAX_SAMPLE_SIZE.
Outputs a Z-set containing randomly sampled keys. Each key is output
with weight 1 regardless of its weight or the number of associated
values in the input batch.
This is not an incremental operator. It samples the input
batch received at the current clock cycle and not the integral
of the input stream. Prefix the call to stream_sample_keys() with
integrate_trace() to sample the integral of the input.
WARNING: This operator (by definition) returns non-deterministic outputs. As such it may not play well with most other DBSP operators and must be used with care.
Sourcepub fn stream_sample_unique_key_vals(
&self,
sample_size: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>
pub fn stream_sample_unique_key_vals( &self, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>
Generates a uniform random sample of (key,value) pairs from self,
assuming that self contains exactly one value per key.
Equivalent to self.map(|(k, v)| (k, v)).stream_sample_keys(),
but is more efficient.
Sourcepub fn stream_key_quantiles(
&self,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>
pub fn stream_key_quantiles( &self, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>
Generates a subset of keys that partition the set of all keys in self
into num_quantiles + 1 approximately equal-size quantiles.
Internally, this operator uses the
stream_sample_keys operator to compute a
uniform random sample of size num_quantiles ^ 2 and then picks
every num_quantile’s element of the sample.
Maximal supported num_quantiles value is MAX_QUANTILES. If the
operator receives a larger num_quantiles value, it treats it as
MAX_QUANTILES.
Outputs a Z-set containing <=num_quantiles keys. Each key is output
with weight 1 regardless of its weight or the number of associated
values in the input batch.
This is not an incremental operator. It samples the input
batch received at the current clock cycle and not the integral
of the input stream. Prefix the call to stream_key_quantiles() with
integrate_trace() to sample the integral of the input.
WARNING: This operator returns non-deterministic outputs, i.e., feeding the same input twice can produce different outputs. As such it may not play well with most other DBSP operators and must be used with care.
Sourcepub fn stream_unique_key_val_quantiles(
&self,
num_quantiles: &Stream<RootCircuit, usize>,
) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>
pub fn stream_unique_key_val_quantiles( &self, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>
Generates a subset of (key, value) pairs that partition the set of all
tuples in self num_quantiles + 1 approximately equal-size quantiles,
assuming that self contains exactly one value per key.
Equivalent to self.map(|(k, v)| (k, v)).stream_unique_key_val_quantiles(), but is more efficient.
Source§impl<C, Pairs> Stream<C, Pairs>
impl<C, Pairs> Stream<C, Pairs>
Sourcepub fn semijoin_stream<Keys, Out>(
&self,
keys: &Stream<C, Keys>,
) -> Stream<C, Out>
pub fn semijoin_stream<Keys, Out>( &self, keys: &Stream<C, Keys>, ) -> Stream<C, Out>
Semijoin two streams of batches.
The operator takes two streams of batches indexed with the same key type
(Pairs::Key = Keys::Key) and outputs a stream obtained by joining each
pair of inputs.
Input streams will typically be produced by Stream::map_index().
§Type arguments
Pairs- batch type in the first input stream.Keys- batch type in the second input stream.Out- output Z-set type.
Source§impl<TS, V> Stream<RootCircuit, OrdIndexedZSet<TS, V>>
impl<TS, V> Stream<RootCircuit, OrdIndexedZSet<TS, V>>
Sourcepub fn partitioned_rolling_aggregate_with_waterline<PK, OV, Agg, PF>(
&self,
waterline: &Stream<RootCircuit, TypedBox<TS, DynDataTyped<TS>>>,
partition_func: PF,
aggregator: Agg,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
pub fn partitioned_rolling_aggregate_with_waterline<PK, OV, Agg, PF>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynDataTyped<TS>>>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
Similar to
partitioned_rolling_aggregate,
but uses waterline to bound its memory footprint.
Splits the input stream into non-overlapping
partitions using partition_func and for each input record
computes an aggregate over a relative time range (e.g., the
last three months) within its partition. Outputs the contents
of the input stream extended with the value of the aggregate.
This operator is incremental and will update previously computed outputs affected by new data. For example, a data point arriving out-of-order may affect previously computed rolling aggregate values at future times.
The waterline stream bounds the out-of-ordedness of the input
data by providing a monotonically growing lower bound on
timestamps that can appear in the input stream. The operator
does not expect inputs with timestamps smaller than the current
waterline. The waterline value is used to bound the amount of
state maintained by the operator.
§Background
The rolling aggregate operator is typically applied to time series data
with bounded out-of-ordedness, i.e, having seen a timestamp ts in
the input stream, the operator will never observe a timestamp
smaller than ts - b for some bound b. This in turn means that
the value of the aggregate will remain constant for timestamps that
only depend on times < ts - b. Hence, we do not need to maintain
the state needed to recompute these aggregates, which allows us to
bound the amount of state maintained by this operator.
The bound ts - b is known as “waterline” and can be computed, e.g., by
the waterline_monotonic operator.
§Arguments
self- time series data indexed by time.waterline- monotonically growing lower bound on timestamps in the input stream.partition_func- function used to split inputs into non-overlapping partitions indexed by partition key of typePK.aggregator- aggregator used to summarize values within the relative time rangerangeof each input timestamp.range- relative time range to aggregate over.
pub fn partitioned_rolling_aggregate_with_waterline_persistent<PK, OV, Agg, PF>( &self, persistent_id: Option<&str>, waterline: &Stream<RootCircuit, TypedBox<TS, DynDataTyped<TS>>>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
Sourcepub fn partitioned_rolling_aggregate<PK, OV, Agg, PF>(
&self,
partition_func: PF,
aggregator: Agg,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
pub fn partitioned_rolling_aggregate<PK, OV, Agg, PF>( &self, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
Rolling aggregate of a partitioned stream over time range.
For each record in the input stream, computes an aggregate over a relative time range (e.g., the last three months). Outputs the contents of the input stream extended with the value of the aggregate.
For each input record (p, (ts, v)), rolling aggregation finds all the
records (p, (ts2, x)) such that ts2 is in range(ts), applies
aggregator across these records to obtain a finalized value f,
and outputs (p, (ts, f)).
This operator is incremental and will update previously computed outputs affected by new data. For example, a data point arriving out-of-order may affect previously computed rolling aggregate value at future times.
pub fn partitioned_rolling_aggregate_persistent<PK, OV, Agg, PF>( &self, persistent_id: Option<&str>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
Sourcepub fn partitioned_rolling_aggregate_linear<PK, OV, A, O, PF, WF, OF>(
&self,
partition_func: PF,
weigh_func: WF,
output_func: OF,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, TS, O>
pub fn partitioned_rolling_aggregate_linear<PK, OV, A, O, PF, WF, OF>( &self, partition_func: PF, weigh_func: WF, output_func: OF, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, O>
A version of Self::partitioned_rolling_aggregate optimized for
linear aggregation functions. For each input record (p, (ts, v)),
it finds all the records (p, (ts2, x)) such that ts2 is in
range.range_of(ts), computes the sum s of f(x) across these
records, and outputs (p, (ts, Some(output_func(s)))).
Output records from linear aggregation contain an Option type because
there might be no records matching range.range_of(ts). If range
contains (relative) time 0, this never happens (because the record
containing ts itself is always a match), so in that case the
caller can safely unwrap() the Option.
In rolling aggregation, the number of output records matches the number of input records.
This method only works for linear aggregation functions f, i.e.,
functions that satisfy f(a+b) = f(a) + f(b). It will produce
incorrect results if f is not linear.
Sourcepub fn partitioned_rolling_average<PK, OV, PF>(
&self,
partition_func: PF,
range: RelRange<TS>,
) -> OrdPartitionedOverStream<PK, TS, OV>
pub fn partitioned_rolling_average<PK, OV, PF>( &self, partition_func: PF, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, OV>
Incremental rolling average.
For each input record, it computes the average of the values in records
in the same partition in the time range specified by range.
Source§impl<B> Stream<RootCircuit, B>
impl<B> Stream<RootCircuit, B>
Sourcepub fn waterline_monotonic<TS, DynTS, IF, WF>(
&self,
init: IF,
waterline_func: WF,
) -> Stream<RootCircuit, TypedBox<TS, DynTS>>
pub fn waterline_monotonic<TS, DynTS, IF, WF>( &self, init: IF, waterline_func: WF, ) -> Stream<RootCircuit, TypedBox<TS, DynTS>>
Compute the waterline of a time series, where the waterline function is
monotonic in event time. The notion of time here is distinct from the
DBSP logical time and can be modeled using any type that implements
Ord.
We use the term “waterline” instead of the more conventional “watermark”, to avoid confusion with watermarks in systems like Flink.
Waterline is an attribute of a time series that indicates the latest timestamp such that no data points with timestamps older than the waterline should appear in the stream. Every record in the time series carries waterline information that can be extracted by applying a user-provided function to it. The waterline of the time series is the maximum of waterlines of all its data points.
This method computes the waterline of a time series assuming that the
waterline function is monotonic in the event time, e.g., waterline = event_time - 5s. Such waterlines are the most common in practice
and can be computed efficiently by only considering the latest
timestamp in each input batch. The method takes a stream of batches
indexed by timestamp and outputs a stream of waterlines (scalar
values). Its output at each timestamp is a scalar (not a Z-set),
computed as the maximum of the previous waterline and the largest
waterline in the new input batch.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Sourcepub fn window(
&self,
inclusive: (bool, bool),
bounds: &Stream<RootCircuit, (TypedBox<K, DynData>, TypedBox<K, DynData>)>,
) -> Stream<RootCircuit, OrdIndexedZSet<K, V>>
pub fn window( &self, inclusive: (bool, bool), bounds: &Stream<RootCircuit, (TypedBox<K, DynData>, TypedBox<K, DynData>)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>>
Extract a subset of values that fall within a moving window from a stream of time-indexed values.
This is a general form of the windowing operator that supports tumbling, rolling windows, watermarks, etc., by relying on a user-supplied function to compute window bounds at each clock cycle.
This operator maintains the window incrementally, i.e., it outputs changes to the contents of the window at each clock cycle. The complete contents of the window can be computed by integrating the output stream.
§Arguments
-
self- stream of indexed Z-sets (indexed by time). The notion of time here is distinct from the DBSP logical time and can be modeled using any type that implementsOrd. -
bounds- stream that contains window bounds to use at each clock cycle. At each clock cycle, it contains a(start_time, end_time)that describes a right-open time range[start_time..end_time), whereend_time >= start_time.start_timemust grow monotonically, i.e.,start_time1andstart_time2read from the stream at two successive clock cycles must satisfystart_time2 >= start_time1.
§Output
The output stream contains changes to the contents of the window: at
every clock cycle it retracts values that belonged to the window at
the previous cycle, but no longer do, and inserts new values added
to the window. The latter include new values in the input stream
that belong to the [start_time..end_time) range and values from
earlier inputs that fall within the new range, but not the previous
range.
§Circuit
bounds
───────────────────────────────────────────────────┐
│
┌────────────────────────────────────────┐ │
│ │ │
│ ▼ ▼
self │ ┌────────────────┐ ┌───────────┐
───────┴────►│ TraceAppend ├──┐ │ Window ├─────►
└────────────────┘ │ └───────────┘
▲ │ ▲
│ ┌──┐ │ │
└────┤z1│◄────────┘ │
└┬─┘ │
│ trace │
└─────────────────────────────┘Source§impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
Sourcepub fn trace(&self) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
pub fn trace(&self) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
Record batches in self in a trace.
This operator labels each untimed batch in the stream with the current timestamp and adds it to a trace.
Sourcepub fn trace_with_bound<T>(
&self,
lower_key_bound: TraceBound<B::Key>,
lower_val_bound: TraceBound<B::Val>,
) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
pub fn trace_with_bound<T>( &self, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>
Record batches in self in a trace with bounds lower_key_bound and
lower_val_bound.
┌─────────────┐ trace
self ───►│ TraceAppend ├─────────┐───► output
└─────────────┘ │
▲ │
│ │
│ local ┌───────┐ │z1feedback
└─────────┤Z1Trace│◄──┘
└───────┘Source§impl<C, B> Stream<C, B>
impl<C, B> Stream<C, B>
Sourcepub fn integrate_trace_retain_keys<TS, RK>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_key_func: RK,
)
pub fn integrate_trace_retain_keys<TS, RK>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_key_func: RK, )
Like integrate_trace, but additionally applies a retainment policy to
keys in the trace.
§Background
Relations that store time series data typically have the property that any new updates can only affect records with recent timestamps. Depending on how the relation is used in queries this might mean that, while records with older timestamps still exist in the relation, they cannot affect any future incremental computation and therefore don’t need to be stored.
§Design
We support two mechanism to specify and eventually discard such unused records.
The first mechanism, exposed via the
integrate_trace_with_bound
method, is only applicable when keys and/or values in the collection
are ordered by time. It allows each consumer of the trace to specify
a lower bound on the keys and values it is interested in. The
effective bound is the minimum of all bounds specified by individual
consumers.
The second mechanism, implemented by this method and the
integrate_trace_retain_values
method, is more general and allows the caller to specify an
arbitrary condition on keys and values in the trace respectively.
Keys or values that don’t satisfy the condition are eventually
reclaimed by the trace. This mechanism is applicable to collections
that are not ordered by time. Hence it doesn’t require rearranging
the data in time order. Furthermore, it is applicable to collections
that contain multiple timestamp column. Such multidimensional
timestamps only form a partial order.
Unlike the first mechanism, this mechanism only allows one global
condition to be applied to the stream. This bound affects all
operators that use the trace of the stream, i.e., call
integrate_trace (or trace in the root scope) on it. This includes
for instance join, aggregate, and distinct. All such operators
will reference the same instance of a trace. Therefore bounds
specified by this API must be based on a global analysis of the
entire program.
The two mechanisms described above interact in different ways for keys and values. For keys, the lower bound and the retainment condition are independent and can be active at the same time. Internally, they are enforced using different techniques. Lower bounds are enforced at essentially zero cost. The retention condition is more expensive, but more general.
For values, only one of the two mechanisms can be enabled for any given stream. Whenever a retainment condition is specified it supersedes any lower bounds constraints.
§Arguments
-
bounds_stream- This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps inselfor some other stream, computed with the help of thewaterlineoperator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams. -
retain_key_func- given the value received from thebounds_streamat the last clock cycle and a key, returnstrueif the key should be retained in the trace andfalseif it should be discarded.
§Correctness
-
As discussed above, the retainment policy set using this method applies to all consumers of the trace. An incorrect policy may reclaim keys that are still needed by some of the operators, leading to incorrect results. Computing a correct retainment policy can be a subtle and error prone task, which is probably best left to automatic tools like compilers.
-
The retainment policy set using this method only applies to
self, but not any stream derived from it. In particular, ifselfis re-sharded using theshardoperator, then it may be necessary to callintegrate_trace_retain_keyson the resulting stream. In general, computing a correct retainment policy requires keep track of- Streams that are sharded by construction and hence the
shardoperator is a no-op for such streams. For instance, theadd_input_setandaggregateoperators produce sharded streams. - Operators that
shardtheir input streams, e.g.,join.
- Streams that are sharded by construction and hence the
-
This method should be invoked at most once for a stream.
-
retain_key_funcmust be monotone in its first argument: for any timestampts1and keyksuch thatretain_key_func(ts1, k) = false, and for anyts2 >= ts1it must hold thatretain_key_func(ts2, k) = false, i.e., once a key is rejected, it will remain rejected as the bound increases.
Sourcepub fn integrate_trace_retain_values<TS, RV>(
&self,
bounds_stream: &Stream<C, TypedBox<TS, DynData>>,
retain_value_func: RV,
)
pub fn integrate_trace_retain_values<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, )
Similar to
integrate_trace_retain_keys,
but applies a retainment policy to values in the trace.
pub fn integrate_trace_retain_values_last_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
pub fn integrate_trace_retain_values_top_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
Sourcepub fn integrate_trace(&self) -> Stream<C, Spine<B>>
pub fn integrate_trace(&self) -> Stream<C, Spine<B>>
Constructs and returns a untimed trace of this stream.
The trace is unbounded, meaning that data will not be discarded because
it has a low key or value. Filter functions set with
integrate_trace_retain_keys or
integrate_trace_retain_values
can still discard data.
The result batch is stored durably for fault tolerance.
Sourcepub fn integrate_trace_with_bound(
&self,
lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>,
lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>,
) -> Stream<C, Spine<B>>
pub fn integrate_trace_with_bound( &self, lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>, lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>, ) -> Stream<C, Spine<B>>
Constructs and returns a untimed trace of this stream.
Data in the trace with a key less than lower_key_bound or value less
than lower_val_bound can be discarded, although these bounds can be
lowered later (discarding less data). Filter functions set with
integrate_trace_retain_keys or
integrate_trace_retain_values
can still discard data.
The result batch is stored durably for fault tolerance.
Source§impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
pub fn aggregate<A>( &self, aggregator: A, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A::Output>>
pub fn aggregate_persistent<A>( &self, persistent_id: Option<&str>, aggregator: A, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A::Output>>
pub fn aggregate_linear_postprocess<F, A, OF, OV>( &self, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn aggregate_linear_postprocess_persistent<F, A, OF, OV>( &self, persistent_id: Option<&str>, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
pub fn join<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Sourcepub fn join_balanced_inner<F, V2, OV>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn join_balanced_inner<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
An adaptive version of join that can change its partitioning policy dynamically to avoid skew.
Sourcepub fn join_balanced<F, V2, OV>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn join_balanced<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Behaves as join_balanced_inner when adaptive joins are enabled in dev tweaks and as join otherwise.
Sourcepub fn left_join<F, V2, OV>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Left-join self with other.
§Important
The other stream must not contain any None values. If it does, the result may be incorrect.
The type signature uses Option for values for performance reasons, to avoid an extra internal
transformation.
Sourcepub fn left_join_balanced_inner<F, V2, OV>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join_balanced_inner<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
An adaptive version of left_join that dynamically changes its partitioning policy to avoid skew.
Sourcepub fn left_join_balanced<F, V2, OV>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join_balanced<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Behaves as left_join_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join otherwise.
pub fn join_flatmap<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Sourcepub fn join_flatmap_balanced_inner<F, V2, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn join_flatmap_balanced_inner<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
An adaptive version of join_flatmap that dynamically changes its partitioning policy to avoid skew.
Sourcepub fn join_flatmap_balanced<F, V2, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn join_flatmap_balanced<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Behaves as join_flatmap_balanced_inner when adaptive joins are enabled in dev tweaks and as join_flatmap otherwise.
Sourcepub fn left_join_flatmap<F, V2, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join_flatmap<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Like left_join, but can produce multiple output values for each (k, v1, v2) tuple.
Sourcepub fn left_join_flatmap_balanced_inner<F, V2, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join_flatmap_balanced_inner<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
An adaptive version of left_join_flatmap that dynamically changes its partitioning policy to avoid skew.
Sourcepub fn left_join_flatmap_balanced<F, V2, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdZSet<OV>>
pub fn left_join_flatmap_balanced<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
Behaves as left_join_flatmap_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join_flatmap otherwise.
pub fn join_index<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
Sourcepub fn join_index_balanced_inner<F, V2, OK, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn join_index_balanced_inner<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
An adaptive version of join_index that dynamically changes its partitioning policy to avoid skew.
Sourcepub fn join_index_balanced<F, V2, OK, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>,
join: F,
) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn join_index_balanced<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
Behaves as join_index_balanced_inner when adaptive joins are enabled in dev tweaks and as join_index otherwise.
Sourcepub fn left_join_index<F, V2, OK, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn left_join_index<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
Like left_join_flatmap, but produces an indexed output stream.
Sourcepub fn left_join_index_balanced_inner<F, V2, OK, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn left_join_index_balanced_inner<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
An adaptive version of left_join_index that dynamically changes its partitioning policy to avoid skew.
Sourcepub fn left_join_index_balanced<F, V2, OK, OV, It>(
&self,
other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>,
join: F,
) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn left_join_index_balanced<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
Behaves as left_join_index_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join_index otherwise.
pub fn antijoin<K2, V2>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K2, V2>>, ) -> Self
pub fn distinct(&self) -> Self
pub fn hash_distinct(&self) -> Self
pub fn waterline<TS, WF, IF, LB>( &self, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
pub fn waterline_persistent<TS, WF, IF, LB>( &self, persistent_id: Option<&str>, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
pub fn filter<F>(&self, filter_func: F) -> Self
pub fn map<F, OK>(&self, map_func: F) -> Stream<RootCircuit, OrdZSet<OK>>
pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn flat_map<F, I>(&self, func: F) -> Stream<RootCircuit, OrdZSet<I::Item>>
pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
Source§impl<K> Stream<RootCircuit, OrdZSet<K>>where
K: DBData,
impl<K> Stream<RootCircuit, OrdZSet<K>>where
K: DBData,
pub fn distinct(&self) -> Self
pub fn hash_distinct(&self) -> Self
pub fn waterline<TS, WF, IF, LB>( &self, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
pub fn waterline_persistent<TS, WF, IF, LB>( &self, persistent_id: Option<&str>, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
pub fn filter<F>(&self, filter_func: F) -> Self
pub fn map<F, OK>(&self, map_func: F) -> Stream<RootCircuit, OrdZSet<OK>>
pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn flat_map<F, I>(&self, func: F) -> Stream<RootCircuit, OrdZSet<I::Item>>
pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
pub fn controlled_key_filter_typed<T, E, F, RF>( &self, threshold: &Stream<RootCircuit, T>, filter_func: F, report_func: RF, ) -> (Self, Stream<RootCircuit, OrdZSet<E>>)
Source§impl<K, V> Stream<NestedCircuit, OrdIndexedZSet<K, V>>
impl<K, V> Stream<NestedCircuit, OrdIndexedZSet<K, V>>
pub fn aggregate<A>( &self, aggregator: A, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, A::Output>>
pub fn aggregate_persistent<A>( &self, persistent_id: Option<&str>, aggregator: A, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, A::Output>>
pub fn aggregate_linear_postprocess<F, A, OF, OV>( &self, f: F, of: OF, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, OV>>
pub fn aggregate_linear_postprocess_persistent<F, A, OF, OV>( &self, persistent_id: Option<&str>, f: F, of: OF, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, OV>>
pub fn join<F, V2, OV>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdZSet<OV>>
pub fn join_flatmap<F, V2, OV, It>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdZSet<OV>>
pub fn join_index<F, V2, OK, OV, It>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
pub fn antijoin<K2, V2>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K2, V2>>, ) -> Self
pub fn distinct(&self) -> Self
pub fn hash_distinct(&self) -> Self
pub fn filter<F>(&self, filter_func: F) -> Self
pub fn map<F, OK>(&self, map_func: F) -> Stream<NestedCircuit, OrdZSet<OK>>
pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
pub fn flat_map<F, I>(&self, func: F) -> Stream<NestedCircuit, OrdZSet<I::Item>>
pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
Source§impl<K> Stream<NestedCircuit, OrdZSet<K>>where
K: DBData,
impl<K> Stream<NestedCircuit, OrdZSet<K>>where
K: DBData,
pub fn distinct(&self) -> Self
pub fn hash_distinct(&self) -> Self
pub fn filter<F>(&self, filter_func: F) -> Self
pub fn map<F, OK>(&self, map_func: F) -> Stream<NestedCircuit, OrdZSet<OK>>
pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
pub fn flat_map<F, I>(&self, func: F) -> Stream<NestedCircuit, OrdZSet<I::Item>>
pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
Trait Implementations§
Source§impl<C, B> RecursiveStreams<C> for Stream<C, B>
impl<C, B> RecursiveStreams<C> for Stream<C, B>
Source§type Feedback = DelayedFeedback<C, B>
type Feedback = DelayedFeedback<C, B>
DelayedFeedback type to a group of streams; contains a
DelayedFeedback instance for each stream in the group.Source§type Export = Stream<<C as Circuit>::Parent, Spine<B>>
type Export = Stream<<C as Circuit>::Parent, Spine<B>>
Source§type Output = Stream<<C as Circuit>::Parent, B>
type Output = Stream<<C as Circuit>::Parent, B>
type Factories = DistinctFactories<B, <C as WithClock>::Time>
Source§fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self)
fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self)
Source§fn export(self, factories: &Self::Factories) -> Self::Export
fn export(self, factories: &Self::Factories) -> Self::Export
self to the parent circuit.Source§fn consolidate(
exports: Self::Export,
factories: &Self::Factories,
) -> Self::Output
fn consolidate( exports: Self::Export, factories: &Self::Factories, ) -> Self::Output
Stream::dyn_consolidate to all streams in exports.Source§impl<K, V, B, C> RecursiveStreams<C> for Stream<C, TypedBatch<K, V, ZWeight, B>>
impl<K, V, B, C> RecursiveStreams<C> for Stream<C, TypedBatch<K, V, ZWeight, B>>
type Inner = Stream<C, B>
type Output = Stream<<C as Circuit>::Parent, TypedBatch<K, V, i64, B>>
Source§unsafe fn typed(inner: &Self::Inner) -> Self
unsafe fn typed(inner: &Self::Inner) -> Self
Source§unsafe fn typed_exports(
inner: &<Self::Inner as DynRecursiveStreams<C>>::Output,
) -> Self::Output
unsafe fn typed_exports( inner: &<Self::Inner as DynRecursiveStreams<C>>::Output, ) -> Self::Output
fn inner(&self) -> Self::Inner
fn factories() -> <Self::Inner as DynRecursiveStreams<C>>::Factories
Source§impl<C, D> StreamMetadata for Stream<C, D>where
C: Clone + 'static,
D: 'static,
impl<C, D> StreamMetadata for Stream<C, D>where
C: Clone + 'static,
D: 'static,
fn stream_id(&self) -> StreamId
fn local_node_id(&self) -> NodeId
fn origin_node_id(&self) -> &GlobalNodeId
Source§fn clear_consumer_count(&self)
fn clear_consumer_count(&self)
fn num_consumers(&self) -> usize
Source§fn register_consumer(&self)
fn register_consumer(&self)
Source§fn consume_token(&self)
fn consume_token(&self)
StreamValue::take. Read moreAuto Trait Implementations§
impl<C, D> Freeze for Stream<C, D>where
C: Freeze,
impl<C, D> !RefUnwindSafe for Stream<C, D>
impl<C, D> !Send for Stream<C, D>
impl<C, D> !Sync for Stream<C, D>
impl<C, D> Unpin for Stream<C, D>where
C: Unpin,
impl<C, D> UnsafeUnpin for Stream<C, D>where
C: UnsafeUnpin,
impl<C, D> !UnwindSafe for Stream<C, D>
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<F, W, T, D> Deserialize<With<T, W>, D> for F
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more