Skip to main content

Stream

Struct Stream 

Source
pub struct Stream<C, D> { /* private fields */ }
Expand description

A Stream<C, D> stores the output value of type D of an operator in a circuit with type C.

Circuits are synchronous, meaning that each value is produced and consumed in the same clock cycle, so there can be at most one value in the stream at any time.

The value type D may be any type, although most Stream methods impose additional requirements. Since a stream must yield one data item per clock cycle, the rate at which data arrives is important to the choice of type. If, for example, an otherwise scalar input stream might not have new data on every cycle, an Option type could represent that, and to batch multiple pieces of data in a single step, one might use Vec or another collection type.

In practice, D is often a special collection type called an “indexed Z-set”, represented as trait IndexedZSet. An indexed Z-set is conceptually a set of (key, value, weight) tuples. Indexed Z-sets have a specialization called a “non-indexed Z-set” (ZSet) that contains key and weight only. Indexed and non-indexed Z-sets are both subtraits of a higher-level Batch trait. Many operators on streams work only with an indexed or non-indexed Z-set or another batch type as D.

§Data streams versus delta streams

A value in a Stream can represent data, or it can represent a delta (also called an “update”). Most streams carry data types that could have either meaning. In particular, a stream of indexed or non-indexed Z-sets can carry either:

  • In a stream of data, the weight indicates the multiplicity of a key. A negative weight has no natural interpretation and might indicate a bug.

  • In a stream of deltas or updates, a positive weight represents insertions and a negative weight represents deletions.

There’s no way to distinguish a data stream from a delta stream from just the type of the Stream since, as described above, a stream of Z-sets can be either one. Some operators make sense for either kind of stream; for example, adding streams of Z-sets with plus works equally well in either case, or even for adding a delta to data. But other operations make sense only for streams of data or only for streams of deltas. In these cases, Stream often provides an operator for each type of stream, and the programmer must choose the right one, since the types themselves don’t help.

Stream refers to operators specifically for streams of data as “nonincremental”. These operators, which have stream in their name, e.g. stream_join, take streams of data as input and produce one as output. They act as “lifted scalar operators” that don’t maintain state across invocations and only act on their immediate inputs, that is, each output is produced by independently applying the operator to the individual inputs received in the current step:

      ┌─────────────┐
a ───►│    non-     │
      │ incremental ├───► c
b ───►│  operator   │
      └─────────────┘

Stream refers to operators meant for streams of deltas as “incremental”. These operators take streams of deltas as input and produces a stream of deltas as output. Such operators could be implemented, inefficiently, in terms of a nonincremental version by putting an integration operator before each input and a differentiation operator after the output:

       ┌───┐      ┌─────────────┐
Δa ───►│ I ├─────►│             │
       └───┘      │    non-     │    ┌───┐
                  │ incremental ├───►│ D ├───► Δc
       ┌───┐      │  operator   │    └───┘
Δb ───►│ I ├─────►│             │
       └───┘      └─────────────┘

§Operator naming convention

Stream uses _index and _generic suffixes and stream_ prefix to declare variations of basic operations, e.g., map, map_index, map_generic, map_index_generic, join, stream_join:

  • stream_ prefix: This prefix indicates that the operator is “nonincremental”, that is, that it works with streams of data, rather than streams of deltas (see Data streams versus delta streams, above).

  • _generic suffix: Most operators can assemble their outputs into any collection type that implements the Batch trait. In practice, we typically use OrdIndexedZSet for indexed batches and OrdZSet for non-indexed batches. Methods without the _generic suffix return these concrete types, eliminating the need to type-annotate each invocation, while _generic methods can be used to return arbitrary custom Batch implementations.

  • _index suffix: Methods without the _index suffix return non-indexed batches. <method>_index methods combine the effects of <method> and index, e.g., stream.map_index(closure) is functionally equivalent, but more efficient than, stream.map(closure).index().

§Catalog of stream operators

Stream methods are the main way to perform computations with streams. The number of available methods can be overwhelming, so the subsections below categorize them into functionally related groups.

§Input operators

Most streams are obtained via methods or traits that operate on Streams. The input operators create the initial input streams for these other methods to work with.

Circuit::add_source is the fundamental way to add an input stream. Using it directly makes sense for cases like generating input using a function (perhaps using Generator) or reading data from a file. More commonly, RootCircuit offers the add_input_* functions as convenience wrappers for add_source. Each one returns a tuple of:

  • A Stream that can be attached as input to operators in the circuit (within the constructor function passed to RootCircuit::build only).

  • An input handle that can be used to add input to the stream from outside the circuit. In a typical scenario, the closure passed to build will return all of its input handles, which are used at runtime to feed new inputs to the circuit at each step. Different functions return different kinds of input handles.

Use RootCircuit::add_input_indexed_zset or RootCircuit::add_input_zset to create an (indexed) Z-set input stream. There’s also RootCircuit::add_input_set and RootCircuit::add_input_map to simplify cases where a regular set or map is easier to use than a Z-set. The latter functions maintain an extra internal table tracking the contents of the set or map, so they’re a second choice.

For special cases, there’s also RootCircuit::add_input_stream<T>. The InputHandle that it returns needs to be told which workers to feed the data to, which makes it harder to use. It might be useful for feeding non-relational data to the circuit, such as the current physical time. DBSP does not know how to automatically distribute such values across workers, so the caller must decide whether to send the value to one specific worker or to broadcast it to everyone.

It’s common to pass explicit type arguments to the functions that create input streams, e.g.:

circuit.add_input_indexed_zset::<KeyType, ValueType, WeightType>()

§Output and debugging operators

There’s not much value in computations whose output can’t be seen in the outside world. Use Stream::output to obtain an OutputHandle that exports a stream’s data. The constructor function passed to RootCircuit::build should return the OutputHandle (in addition to all the input handles as described above). After each step, the client code should take the new data from the OutputHandle, typically by calling OutputHandle::consolidate.

Use Stream::inspect to apply a callback function to each data item in a stream. The callback function has no return value and is executed only for its side effects, such as printing the data item to stdout. The inspect operator yields the same stream on its output.

It is not an operator, but Circuit::region can help with debugging by grouping operators into a named collection.

§Record-by-record mapping operators

These operators map one kind of batch to another, allowing the client to pass in a function that looks at individual records in a Z-set or other batch. These functions apply to both streams of deltas and streams of data.

The following methods are available for streams of indexed and non-indexed Z-sets. Each of these takes a function that accepts a key (for non-indexed Z-sets) or a key-value pair (for indexed Z-sets):

  • Use Stream::map to output a non-indexed Z-set using an arbitrary mapping function, or Stream::map_index to map to an indexed Z-set.

  • Use Stream::filter to drop records that do not satisfy a predicate function. The output stream has the same type as the input.

  • Use Stream::flat_map to output a Z-set that maps each input record to any number of records, or Stream::flat_map_index for indexed Z-set output. These methods also work as a filter_map equivalent.

§Value-by-value mapping operators

Sometimes it makes sense to map a stream’s whole data item instead of breaking Z-sets down into records. Unlike the record-by-record functions, these functions works with streams that carry a type other than a Z-set or batch. These functions apply to both streams of deltas and streams of data.

Use Stream::apply to apply a mapping function to each item in a given stream. Stream::apply_named, Stream::apply_owned, and Stream::apply_owned_named offer small variations.

Use Stream::apply2 or Stream::apply2_owned to apply a mapping function to pairs of items drawn from two different input streams.

§Addition and subtraction operators

Arithmetic operators work on Z-sets (and batches) by operating on weights. For example, adding two Z-sets adds the weights of records with the same key-value pair. They also work on streams of arithmetic types.

Use Stream::neg to map a stream to its negation, that is, to negate the weights for a Z-set.

Use Stream::plus to add two streams and Stream::sum to add an arbitrary number of streams. Use Stream::minus to subtract streams.

There aren’t any multiplication or division operators, because there is no clear interpretation of them for Z-sets. You can use Stream::apply and Stream::apply2, as already described, to do arbitrary arithmetic on one or two streams of arithmetic types.

§Stream type conversion operators

These operators convert among streams of deltas, streams of data, and streams of “upserts”. Client code can use them, but they’re more often useful for testing (for example, for checking that incremental operators are equivalent to non-incremental ones) or for building other operators.

Use Stream::integrate to sum up the values within an input stream. The first output value is the first input value, the second output value is the sum of the first two inputs, and so on. This effectively converts a stream of deltas into a stream of data.

Stream::stream_fold generalizes integration. On each step, it calls a function to fold the input value with an accumulator and outputs the accumulator. The client provides the function and the initial value of the accumulator.

Use Stream::differentiate to calculate differences between subsequent values in an input stream. The first output is the first input value, the second output is the second input value minus the first input value, and so on. This effectively converts a stream of data into a stream of deltas. You shouldn’t ordinarily need this operator, at least not for Z-sets, because DBSP operators are fully incremental.

Use Stream::upsert to convert a stream of “upserts” into a stream of deltas. The input stream carries “upserts”, or assignments of values to keys such that a subsequent assignment to a key assigned earlier replaces the earlier value. upsert turns these into a stream of deltas by internally tracking upserts that have already been seen.

§Weighting and counting operators

Use Stream::dyn_weigh to multiply the weights in an indexed Z-set by a user-provided function of the key and value. This is equally appropriate for streams of data or deltas. This method outputs a non-indexed Z-set with just the keys from the input, discarding the values, which also means that weights will be added together in the case of equal input keys.

Stream provides methods to count the number of values per key in an indexed Z-set:

The “distinct” operator on a Z-set maps positive weights to 1 and all other weights to 0. Stream has two implementations:

  • Use Stream::distinct to incrementally process a stream of deltas. If the output stream were to be integrated, it only contain records with weight 0 or 1. This operator internally integrates the stream of deltas, which means its memory consumption is proportional to the integrated data size.

  • Use Stream::stream_distinct to non-incrementally process a stream of data. It sets each record’s weight to 1 if it is positive and drops the others.

§Join on equal keys

A DBSP equi-join takes batches a and b as input, finds all pairs of a record in a and a record in b with the same key, applies a given function F to those records’ key and values, and outputs a Z-set with F’s output.

DBSP implements two kinds of joins:

  • Joins of delta streams (“incremental” joins) for indexed Z-sets with the same key type. Use Stream::join for non-indexed Z-set output, or Stream::join_index for indexed Z-set output.

  • Joins of data streams (“nonincremental” joins), which work with any indexed batches. Use Stream::stream_join, which outputs a non-indexed Z-set.

    stream_join also works for joining a stream of deltas with an invariant stream of data where the latter is used as a lookup table.

    If the output of the join function grows monotonically as (k, v1, v2) tuples are fed to it in lexicographic order, then Stream::monotonic_stream_join is more efficient. One such monotonic function is a join function that returns (k, v1, v2) itself.

One way to implement a Cartesian product is to map unindexed Z-set inputs into indexed Z-sets with a unit key type, e.g. input.index_with(|k| ((), k)), and then use join or stream_join, as appropriate.

§Other kinds of joins

Use Stream::antijoin for antijoins of delta streams. It takes indexed Z-set a and Z-set b with the same key type and yields the subset of a whose keys do not appear in b. b may be indexed or non-indexed and its value type does not matter.

Use Stream::dyn_semijoin_stream for semi-joins of data streams. It takes a batch a and non-indexed batch b with the same key type as a. It outputs a non-indexed Z-set of key-value tuples that contains all the pairs from a for which a key appears in b.

Use Stream::outer_join or Stream::outer_join_default for outer joins of delta streams. The former takes three functions, one for each of the cases (common keys, left key only, right key only), and the latter simplifies it by taking only a function for common keys and passing in the default for the missing value.

DBSP implements “range join” of data streams, which joins keys in a against ranges of keys in b. Stream::dyn_stream_join_range implements range join with non-indexed Z-set output, Stream::dyn_stream_join_range_index with indexed output.

§Aggregation

Aggregation applies a function (the “aggregation function”) to all of the values for a given key in an input stream, and outputs an indexed Z-set with the same keys as the input and the function’s output as values. The output of aggregation usually has fewer records than its input, because it outputs only one record per input key, regardless of the number of key-value pairs with that key.

DBSP implements two kinds of aggregation:

These aggregation functions all partition the aggregation by key, like GROUP BY in SQL. To aggregate all records in a non-indexed Z-set, map to an indexed Z-set with a unit key () before aggregating, then map again to remove the index if necessary, e.g.:

let max_auction_count = auction_counts
    .map_index(|(_auction, count)| ((), *count))
    .aggregate(Max)
    .map(|((), max_count)| *max_count);

§Rolling aggregates

DBSP supports rolling aggregation of time series data over a client-specified “rolling window” range. For this purpose, Rust unsigned integer types model times, larger integers corresponding to later times. The unit of time in use is relevant only for specifying the width of the aggregation window, with RelRange.

The DBSP logical time concept is unrelated to times used in rolling aggregation and other time-series operators. The former is used to establish the ordering in which updates are consumed by DBSP, while the latter model physical times when the corresponding events were generated, observed, or processed. In particular, the ordering of physical and logical timestamps doesn’t have to match. In other words, DBSP can process events out-of-order.

Rolling aggregation takes place within a “partition”, which is any convenient division of the data. It might correspond to a tenant ID, for example, if each tenant’s data is to be separately aggregated. To represent partitioning, rolling aggregation introduces the OrdPartitionedIndexedZSet type, which is an IndexedZSet with an arbitrary key type that specifies the partition (it may be () if all data is to be within a single partition) and a value type of the form (TS, V) where TS is the type used for time and V is the client’s value type.

Rolling aggregation does not reduce the size of data. It outputs one record for each input record.

DBSP has two kinds of rolling aggregation functions that differ based on their tolerance for updating aggregation results when new data arrives for an old moment in time:

§Windowing

Use Stream::dyn_window to extract a stream of deltas to windows from a stream of deltas. This can be useful for windowing outside the context of rolling aggregation.

Implementations§

Source§

impl<C: Clone, B: BatchReader> Stream<C, B>

Source

pub fn inner(&self) -> Stream<C, B::Inner>

Source§

impl<C: Clone, B: DynBatchReader> Stream<C, B>

Source

pub fn typed<TB>(&self) -> Stream<C, TB>
where TB: BatchReader<Inner = B>,

Source§

impl<C: Clone, D: DataTrait + ?Sized> Stream<C, Box<D>>

Source

pub unsafe fn typed_data<T>(&self) -> Stream<C, TypedBox<T, D>>
where T: DBData + Erase<D>,

Adds type information to self, wrapping each element in a TypedBox. This function is a noop at runtime.

§Safety

self must contain concrete values of type T.

Source§

impl<C: Circuit, T, D: ?Sized> Stream<C, TypedBox<T, D>>

Source

pub fn inner_data(&self) -> Stream<C, Box<D>>

Source§

impl<C: Circuit, T: DBData, D: DataTrait + ?Sized> Stream<C, TypedBox<T, D>>

Source

pub fn inner_typed(&self) -> Stream<C, T>

Source§

impl<C: Circuit, T: DBData> Stream<C, T>

Source

pub fn typed_box<D>(&self) -> Stream<C, TypedBox<T, D>>
where D: DataTrait + ?Sized, T: Erase<D>,

Source§

impl<C, D> Stream<C, D>

Source

pub fn local_node_id(&self) -> NodeId

Returns local node id of the operator or subcircuit that writes to this stream.

If the stream originates in a subcircuit, returns the id of the subcircuit node.

Source

pub fn origin_node_id(&self) -> &GlobalNodeId

Returns global id of the operator that writes to this stream.

If the stream originates in a subcircuit, returns id of the operator inside the subcircuit (or one of its subcircuits) that produces the contents of the stream.

Source

pub fn stream_id(&self) -> StreamId

Source

pub fn circuit(&self) -> &C

Reference to the circuit the stream belongs to.

Source

pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool

Source§

impl<C, D> Stream<C, D>
where C: Circuit,

Source

pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self

Create a stream out of an existing RefStreamValue with node_id as the source.

Source

pub fn value(&self) -> RefStreamValue<D>

Source

pub fn export(&self) -> Stream<C::Parent, D>
where C::Parent: Circuit, D: 'static,

Export stream to the parent circuit.

Creates a stream in the parent circuit that contains the last value in self when the child circuit terminates.

This method currently only works for streams connected to a feedback Z1 operator and will panic for other streams.

Source

pub fn set_label(&self, key: &str, val: &str) -> Self

Call set_label on the node that produces this stream.

Source

pub fn get_label(&self, key: &str) -> Option<String>

Call get_label on the node that produces this stream.

Source

pub fn set_persistent_id(&self, name: Option<&str>) -> Self

Set persistent id for the operator that produces this stream.

Source

pub fn get_persistent_id(&self) -> Option<String>

Get persistent id for the operator that produces this stream.

Source§

impl<C, T> Stream<C, T>
where C: Circuit,

Source

pub fn set_persistent_mir_id(&self, id: &str)

Sets id as both the MIR node id and persistent id of the source node of the stream.

Source

pub fn get_mir_node_id(&self) -> Option<String>

Get the MIR node id label if any.

Source§

impl<C, T1> Stream<C, T1>
where C: Circuit, T1: Clone + 'static,

Source

pub fn apply<F, T2>(&self, func: F) -> Stream<C, T2>
where F: Fn(&T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(&x) for each x in self. func cannot mutate captured state.

The operator will have a generic name for debugging and profiling purposes. Use Stream::apply_named, instead, to give it a specific name.

Source

pub fn apply_mut<F, T2>(&self, func: F) -> Stream<C, T2>
where F: FnMut(&T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(&x) for each x in self. func can access and mutate captured state.

The operator will have a generic name for debugging and profiling purposes. Use Stream::apply_mut_named, instead, to give it a specific name.

Source

pub fn apply_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
where N: Into<Cow<'static, str>>, F: Fn(&T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(&x) for each x in self, giving the operator the given name for debugging and profiling purposes. func cannot mutate captured state.

Source

pub fn apply_mut_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
where N: Into<Cow<'static, str>>, F: Fn(&T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(&x) for each x in self, giving the operator the given name for debugging and profiling purposes. func can access and mutate captured state.

Source

pub fn apply_owned<F, T2>(&self, func: F) -> Stream<C, T2>
where F: Fn(T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(x) for each x in self. func cannot mutate captured state.

The operator will have a generic name for debugging and profiling purposes. Use Stream::apply_owned_named, instead, to give it a specific name.

Source

pub fn apply_mut_owned<F, T2>(&self, func: F) -> Stream<C, T2>
where F: FnMut(T1) -> T2 + 'static, T2: Clone + 'static,

Returns a stream that contains func(x) for each x in self. func can access and mutate captured state.

The operator will have a generic name for debugging and profiling purposes. Use Stream::apply_mut_owned_named, instead, to give it a specific name.

Source

pub fn apply_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
where N: Into<Cow<'static, str>>, F: Fn(T1) -> T2 + 'static, T2: Data,

Returns a stream that contains func(x) for each x in self, giving the operator the given name for debugging and profiling purposes. func cannot mutate captured state.

Source

pub fn apply_mut_owned_named<N, F, T2>(&self, name: N, func: F) -> Stream<C, T2>
where N: Into<Cow<'static, str>>, F: FnMut(T1) -> T2 + 'static, T2: Data,

Returns a stream that contains func(x) for each x in self, giving the operator the given name for debugging and profiling purposes. func can access and mutate captured state.

Source

pub fn apply_core<N, T2, O, B, F>( &self, name: N, owned: O, borrowed: B, fixpoint: F, ) -> Stream<C, T2>
where N: Into<Cow<'static, str>>, T2: Data, O: FnMut(T1) -> T2 + 'static, B: FnMut(&T1) -> T2 + 'static, F: Fn(Scope) -> bool + 'static,

Apply the ApplyCore operator to self with a custom name

Source§

impl<C, T1> Stream<C, T1>
where C: Circuit, T1: Clone + 'static,

Source

pub fn apply2<F, T2, T3>(&self, other: &Stream<C, T2>, func: F) -> Stream<C, T3>
where T2: Clone + 'static, T3: Clone + 'static, F: Fn(&T1, &T2) -> T3 + 'static,

Apply a user-provided binary function to inputs from two streams at each timestamp.

Source

pub fn apply2_owned<F, T2, T3>( &self, other: &Stream<C, T2>, func: F, ) -> Stream<C, T3>
where T2: Clone + 'static, T3: Clone + 'static, F: Fn(T1, &T2) -> T3 + 'static,

Apply a user-provided binary function to inputs from two streams at each timestamp, consuming the first input.

Source§

impl<C, T1> Stream<C, T1>
where C: Circuit, T1: Clone + 'static,

Source

pub fn apply3<F, T2, T3, T4>( &self, other1: &Stream<C, T2>, other2: &Stream<C, T3>, func: F, ) -> Stream<C, T4>
where T2: Clone + 'static, T3: Clone + 'static, T4: Clone + 'static, F: Fn(Cow<'_, T1>, Cow<'_, T2>, Cow<'_, T3>) -> T4 + 'static,

Apply a user-provided ternary function to inputs from three streams at each timestamp.

Source

pub fn apply3_with_preference<F, T2, T3, T4>( &self, self_preference: OwnershipPreference, other1: (&Stream<C, T2>, OwnershipPreference), other2: (&Stream<C, T3>, OwnershipPreference), func: F, ) -> Stream<C, T4>
where T2: Clone + 'static, T3: Clone + 'static, T4: Clone + 'static, F: Fn(Cow<'_, T1>, Cow<'_, T2>, Cow<'_, T3>) -> T4 + 'static,

Apply a user-provided ternary function to inputs at each timestamp.

Allows the caller to specify the ownership preference for each input stream.

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch<Time = ()>, B::InnerBatch: Send,

Source

pub fn gather(&self, receiver_worker: usize) -> Stream<C, B>

Collect all shards of a stream at the same worker.

The output stream in receiver_worker will contain a union of all input batches across all workers. The output streams in all other workers will contain empty batches.

Source§

impl<C, IB> Stream<C, IB>
where C: Circuit, IB: Batch<Time = ()>, IB::InnerBatch: Send,

Source

pub fn shard(&self) -> Stream<C, IB>

Shard batches across multiple worker threads based on keys.

§Theory

We parallelize processing across N worker threads by creating a replica of the same circuit per thread and sharding data across replicas. To ensure correctness (i.e., that the sum of outputs produced by individual workers is equal to the output produced by processing the entire dataset by one worker), sharding must satisfy certain requirements determined by each operator. In particular, for distinct, and aggregate all tuples that share the same key must be processed by the same worker. For join, tuples from both input streams with the same key must be processed by the same worker.

Other operators, e.g., filter and flat_map, impose no restrictions on the sharding scheme: as long as each tuple in a batch is processed by some worker, the correct result will be produced. This is true for all linear operators.

The shard operator shards input batches based on the hash of the key, making sure that tuples with the same key always end up at the same worker. More precisely, the operator re-shards its input by partitioning batches in the input stream of each worker based on the hash of the key, distributing resulting fragments among peers and re-assembling fragments at each peer:

        ┌──────────────────┐
worker1 │                  │
───────►├─────┬───────────►├──────►
        │     │            │
───────►├─────┴───────────►├──────►
worker2 │                  │
        └──────────────────┘
§Usage

Most users do not need to invoke shard directly (and doing so is likely to lead to incorrect results unless you know exactly what you are doing). Instead, each operator re-shards its inputs as necessary, e.g., join applies shard to both of its input streams, while filter consumes its input directly without re-sharding.

§Performance considerations

In the current implementation, the shard operator introduces a synchronization barrier across all workers: its output at any worker is only produced once input batches have been collected from all workers. This limits the scalability since a slow worker (e.g., running on a busy CPU core or sharing the core with other workers) or uneven sharding can slow down the whole system and reduce gains from parallelization.

Source

pub fn shard_workers(&self, workers: Range<usize>) -> Stream<C, IB>

Shard batch across just the specified range of workers.

If workers contains just one worker, then Stream::gather is more efficient.

Source§

impl<C, D> Stream<C, D>
where D: Clone + 'static, C: Circuit,

Source

pub fn inspect<F>(&self, callback: F) -> Self
where F: FnMut(&D) + 'static,

Apply Inspect operator to self.

§Examples
let circuit = RootCircuit::build(move |circuit| {
    let mut n = 1;
    let stream = circuit.add_source(Generator::new(move || {
        let res = n;
        n += 1;
        res
    }));
    // Print all values in `stream`.
    stream.inspect(|n| println!("inspect: {}", n));
    Ok(())
})
.unwrap();
Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch,

Source

pub fn accumulate(&self) -> Stream<C, Option<Spine<B>>>

Accumulate changes within a clock cycle in a spine.

Outputs a spine containing all input changes accumulated since the previous flush once per clock cycle, during flush, and None otherwise.

This operator is a key part of efficient processing of long transactions. It is used in conjunction with stateful operators like join, aggregate, distinct, etc., to supply all inputs comprising a transaction at once, avoiding computing mutually canceling changes.

Using Spine to accumulate changes ensures that during a long transaction changes are pushed to storage and get compacted by background workers.

Source

pub fn accumulate_with_enable_count( &self, ) -> (Stream<C, Option<Spine<B>>>, Arc<AtomicUsize>)

Like Self::accumulate, but also returns a reference to the enable count of the accumulator.

Used to instantiate accumulators for output connectors. See Accumulator::enable_count documentation.

Source

pub fn accumulate_apply2<B2, F, T>( &self, other: &Stream<C, B2>, func: F, ) -> Stream<C, Option<T>>
where B2: Batch, F: Fn(TypedBatch<B::Key, B::Val, B::R, SpineSnapshot<B::Inner>>, TypedBatch<B2::Key, B2::Val, B2::R, SpineSnapshot<B2::Inner>>) -> T + 'static, T: Clone + 'static,

Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: 'static + Clone,

Source

pub fn condition<F>(&self, condition_func: F) -> Condition<C>
where F: 'static + Fn(&D) -> bool,

Attach a condition to a stream.

A Condition is a condition on the value in the stream checked on each clock cycle, that can be used to terminate the execution of the subcircuit (see ChildCircuit::iterate_with_condition and ChildCircuit::iterate_with_conditions).

Source§

impl<C, Z> Stream<C, Z>

Source

pub fn weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>

Incrementally sums the weights for each key self into an indexed Z-set that maps from the original keys to the weights. Both the input and output are streams of updates.

Source

pub fn weighted_count_generic<O>(&self) -> Stream<C, O>
where O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, DynV = DynData>,

Like Self::dyn_weighted_count, but can return any batch type.

Source

pub fn distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>

Incrementally, for each key in self, counts the number of unique values having positive weights, and outputs it as an indexed Z-set that maps from the original keys to the unique value counts. Both the input and output are streams of updates.

Source

pub fn distinct_count_generic<O>(&self) -> Stream<C, O>
where O: IndexedZSet<Key = Z::Key, DynK = DynData>,

Like Self::dyn_distinct_count, but can return any batch type.

Source

pub fn stream_weighted_count( &self, ) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>

Non-incrementally sums the weights for each key self into an indexed Z-set that maps from the original keys to the weights. Both the input and output are streams of data (not updates).

Source

pub fn stream_weighted_count_generic<O>(&self) -> Stream<C, O>
where O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,

Like Self::dyn_stream_weighted_count, but can return any batch type.

Source

pub fn stream_distinct_count( &self, ) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>>

Incrementally, for each key in self, counts the number of unique values having positive weights, and outputs it as an indexed Z-set that maps from the original keys to the unique value counts. Both the input and output are streams of data (not updates).

Source

pub fn stream_distinct_count_generic<O>(&self) -> Stream<C, O>
where O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,

Like Self::dyn_distinct_count, but can return any batch type.

Source§

impl<C, D> Stream<C, D>
where D: HasZero + Clone + 'static, C: Circuit,

Source

pub fn delta0<CC>(&self, subcircuit: &CC) -> Stream<CC, D>
where CC: Circuit<Parent = C>,

Import self from the parent circuit to subcircuit via the Delta0 operator.

See Delta0 operator documentation.

Source

pub fn delta0_with_preference<CC>( &self, subcircuit: &CC, input_preference: OwnershipPreference, ) -> Stream<CC, D>
where CC: Circuit<Parent = C>,

Like Self::delta0, but overrides the ownership preference on the input stream with input_preference.

Source§

impl<C, D> Stream<C, D>
where C: Circuit + 'static, D: Checkpoint + SizeOf + NumEntries + GroupValue,

Source

pub fn differentiate(&self) -> Stream<C, D>

Stream differentiation.

Computes the difference between current and previous value of self: differentiate(a) = a - z^-1(a). The first output is the first input value, the second output is the second input value minus the first input value, and so on.

You shouldn’t ordinarily need this operator, at least not for streams of Z-sets, because most DBSP operators are fully incremental.

Source

pub fn differentiate_nested(&self) -> Stream<C, D>

Nested stream differentiation.

Source§

impl<C, T, K, V, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>
where C: Clone + 'static, K: DBData + Erase<B::Key>, V: DBData + Erase<B::Val>, T: Timestamp, B: DynIndexedZSet, TypedBatch<K, V, ZWeight, B>: Checkpoint + SizeOf + NumEntries + Clone + 'static,

Source

pub fn accumulate_differentiate( &self, ) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, ZWeight, B>>

Accumulates changes within a clock cycle and differentiates accumulated changes across clock cycles.

Source§

impl<C, D> Stream<C, D>
where C: Circuit + 'static, D: Checkpoint + SizeOf + NumEntries + Neg<Output = D> + Clone + AddByRef + AddAssignByRef + NegByRef + Eq + 'static,

Source

pub fn differentiate_with_initial_value(&self, initial: D) -> Stream<C, D>

Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: Checkpoint + AddByRef + AddAssignByRef + Clone + Eq + HasZero + SizeOf + NumEntries + 'static,

Source

pub fn integrate(&self) -> Stream<C, D>

Integrate the input stream.

Computes the sum of values in the input stream. The first output value is the first input value, the second output value is the sum of the first two inputs, and so on.

§Examples
let circuit = RootCircuit::build(move |circuit| {
    // Generate a stream of 1's.
    let stream = circuit.add_source(Generator::new(|| 1));
    stream.inspect(move |n| eprintln!("{n}"));
    // Integrate the stream.
    let integral = stream.integrate();
    integral.inspect(move |n| eprintln!("{n}"));
    let mut counter1 = 0;
    eprintln!("{counter1}");
    integral.inspect(move |n| {
        counter1 += 1;
        assert_eq!(*n, counter1)
    });
    let mut counter2 = 0;
    integral.delay().inspect(move |n| {
        assert_eq!(*n, counter2);
        counter2 += 1;
    });
    Ok(())
})
.unwrap()
.0;

for _ in 0..5 {
    circuit.transaction().unwrap();
}

The above example generates the following input/output mapping:

input:  1, 1, 1, 1, 1, ...
output: 1, 2, 3, 4, 5, ...
Source

pub fn integrate_nested(&self) -> Stream<C, D>

Integrate stream of streams.

Computes the sum of nested streams, i.e., rather than integrating values in each nested stream, this function sums up entire input streams across all parent timestamps, where the sum of streams is defined as a stream of point-wise sums of their elements: integral[i,j] = sum(input[k,j]), k<=i, where stream[i,j] is the value of stream at time [i,j], i is the parent timestamp, and j is the child timestamp.

Yields the sum element-by-element as the input stream is fed to the integral.

§Examples

Input stream (one row per parent timestamps):

1 2 3 4
1 1 1 1 1
2 2 2 0 0

Integral:

1 2 3 4
2 3 4 5 1
4 5 6 5 1
Source§

impl<C, T, K, V, R, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
where C: Clone + 'static, T: Timestamp, K: DBData + Erase<B::Key>, V: DBData + Erase<B::Val>, R: DBWeight + Erase<B::R>, B: DynIndexedZSet + Checkpoint,

Source

pub fn accumulate_integrate( &self, ) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>

Integrate the input stream, updating the output once per clock tick.

Source§

impl<C, D> Stream<C, D>
where D: Clone + 'static + Neg<Output = D> + NegByRef, C: Circuit,

Source

pub fn neg(&self) -> Stream<C, D>

Returns a stream with the same type as self in which each value is negated. Negating an indexed Z-set negates all the weights.

Source§

impl<T> Stream<RootCircuit, T>
where T: Debug + Clone + Send + 'static,

Source

pub fn output(&self) -> OutputHandle<T>

Create an output handle that makes the contents of self available outside the circuit.

This API makes the result of the computation performed by the circuit available to the outside world. At each clock cycle, the contents of the stream is buffered inside the handle and can be read using the OutputHandle API.

Source

pub fn output_persistent(&self, persistent_id: Option<&str>) -> OutputHandle<T>

Source

pub fn output_persistent_with_gid( &self, persistent_id: Option<&str>, ) -> (OutputHandle<T>, GlobalNodeId)

Source

pub fn output_guarded( &self, guard: &Stream<RootCircuit, bool>, ) -> OutputHandle<T>

Create an output handle that makes the contents of self available outside the circuit on demand.

This operator is similar to output, but it only produces the output conditionally, when the value in the guard stream is true. When guard is false, the output mailbox remains empty at the end of the clock cycle, and OutputHandle::take_from_worker will return None. This operator can be used to output a large collection, such as an integral of a stream, on demand.

Source§

impl<B> Stream<RootCircuit, B>
where B: Batch + Send,

Source

pub fn accumulate_output(&self) -> OutputHandle<SpineSnapshot<B>>

Output operator that produces a single accumulated output per clock cycle.

Source

pub fn accumulate_output_persistent( &self, persistent_id: Option<&str>, ) -> OutputHandle<SpineSnapshot<B>>

Source

pub fn accumulate_output_persistent_with_gid( &self, persistent_id: Option<&str>, ) -> (OutputHandle<SpineSnapshot<B>>, Arc<AtomicUsize>, GlobalNodeId)

Returns:

  • The output handle.
  • The enable count of the accumulator. Can be used to enable/disable the accumulator.
  • The global node ID of the output operator.
Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: AddByRef + AddAssignByRef + Clone + 'static,

Source

pub fn plus(&self, other: &Stream<C, D>) -> Stream<C, D>

Apply the Plus operator to self and other. Adding two indexed Z-sets adds the weights of matching key-value pairs.

The stream type’s addition operation must be commutative.

§Examples
let circuit = RootCircuit::build(move |circuit| {
    // Stream of non-negative values: 0, 1, 2, ...
    let mut n = 0;
    let source1 = circuit.add_source(Generator::new(move || {
        let res = n;
        n += 1;
        res
    }));
    // Stream of non-positive values: 0, -1, -2, ...
    let mut n = 0;
    let source2 = circuit.add_source(Generator::new(move || {
        let res = n;
        n -= 1;
        res
    }));
    // Compute pairwise sums of values in the stream; the output stream will contain zeros.
    source1.plus(&source2).inspect(|n| assert_eq!(*n, 0));
    Ok(())
})
.unwrap()
.0;
Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: AddByRef + AddAssignByRef + Neg<Output = D> + NegByRef + Clone + 'static,

Source

pub fn minus(&self, other: &Stream<C, D>) -> Stream<C, D>

Apply the Minus operator to self and other. Subtracting two indexed Z-sets subtracts the weights of matching key-value pairs.

Source§

impl<T> Stream<RootCircuit, T>
where T: Clone + 'static,

Source

pub fn stream_fold<A, F>(&self, init: A, fold_func: F) -> Stream<RootCircuit, A>
where F: Fn(A, &T) -> A + 'static, A: Checkpoint + Eq + Clone + SizeOf + NumEntries + 'static,

Folds every element in the input stream into an accumulator and outputs the current value of the accumulator at every clock cycle.

§Arguments
  • init - initial value of the accumulator.
  • fold_func - closure that computes the new value of the accumulator as a function of the previous value and the new input at each clock cycle.
Source

pub fn stream_fold_persistent<A, F>( &self, persistent_id: Option<&str>, init: A, fold_func: F, ) -> Stream<RootCircuit, A>
where F: Fn(A, &T) -> A + 'static, A: Checkpoint + Eq + Clone + SizeOf + NumEntries + 'static,

Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: AddByRef + AddAssignByRef + Clone + NumEntries + 'static,

Source

pub fn sum<'a, I>(&'a self, streams: I) -> Stream<C, D>
where I: IntoIterator<Item = &'a Self>,

Apply the Sum operator to self and all streams in streams. The first output is the sum of the first input from each input stream, the second output is the sum of the second input from each input stream, and so on.

Source§

impl<C, D> Stream<C, D>
where C: Circuit,

Source

pub fn transaction_delay_with_initial_value(&self, initial: D) -> Stream<C, D>
where D: Checkpoint + SizeOf + NumEntries + Clone + 'static,

Applies TransactionZ1 operator to self.

Source§

impl<C, D> Stream<C, D>
where C: Circuit,

Source

pub fn delay(&self) -> Stream<C, D>
where D: Checkpoint + Eq + SizeOf + NumEntries + Clone + HasZero + 'static,

Applies Z1 operator to self.

Source

pub fn delay_with_initial_value(&self, initial: D) -> Stream<C, D>
where D: Checkpoint + Eq + SizeOf + NumEntries + Clone + 'static,

Source

pub fn delay_nested(&self) -> Stream<C, D>
where D: Eq + Clone + HasZero + SizeOf + NumEntries + 'static,

Applies Z1Nested operator to self.

Source§

impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
where C: Circuit, B: DynBatch<Time = ()>, K: DBData + Erase<B::Key>, V: DBData + Erase<B::Val>, R: DBWeight + Erase<B::R>,

Source

pub fn accumulate_trace( &self, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>

Record batches in self in a trace.

This operator labels each untimed batch in the stream with the current timestamp and adds it to a trace.

It updates the output trace once per transaction, on flush.

Source

pub fn accumulate_trace_with_bound<T>( &self, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>

Record batches in self in a trace with bounds lower_key_bound and lower_val_bound.

           ┌──────────┐    ┌────────────────────────┐ trace
self ─────►│accumulate├───►│ AccumulateTraceAppend  │───────────────┬────► output
           └──────────┘    └────────────────────────┘               │
                             ▲                                      │
                             │                                      │
                             │              ┌─────────────────┐     │ z1feedback
                             └──────────────┤AccumulateZ1Trace├◄────┘
                              delayed_trace └─────────────────┘
Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch<Time = ()>,

Source

pub fn accumulate_integrate_trace_retain_keys<TS, RK>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_key_func: RK, )
where TS: DBData + Erase<DynData>, RK: Fn(&B::Key, &TS) -> bool + Clone + Send + Sync + 'static,

Applies a retainment policy to keys in the integral of self.

§Background

Relations that store time series data typically have the property that any new updates can only affect records with recent timestamps. Depending on how the relation is used in queries this might mean that, while records with older timestamps still exist in the relation, they cannot affect any future incremental computation and therefore don’t need to be stored.

§Design

We support two mechanism to specify and eventually discard such unused records.

The first mechanism, exposed via the accumulate_integrate_trace_with_bound method, is only applicable when keys and/or values in the collection are ordered by time. It allows each consumer of the trace to specify a lower bound on the keys and values it is interested in. The effective bound is the minimum of all bounds specified by individual consumers.

The second mechanism, implemented by this method and the accumulate_integrate_trace_retain_values method, is more general and allows the caller to specify an arbitrary condition on keys and values in the trace respectively. Keys or values that don’t satisfy the condition are eventually reclaimed by the trace. This mechanism is applicable to collections that are not ordered by time. Hence it doesn’t require rearranging the data in time order. Furthermore, it is applicable to collections that contain multiple timestamp column. Such multidimensional timestamps only form a partial order.

Unlike the first mechanism, this mechanism only allows one global condition to be applied to the stream. This bound affects all operators that use the trace of the stream, i.e., call integrate_trace (or trace in the root scope) on it. This includes for instance join, aggregate, and distinct. All such operators will reference the same instance of a trace. Therefore bounds specified by this API must be based on a global analysis of the entire program.

The two mechanisms described above interact in different ways for keys and values. For keys, the lower bound and the retainment condition are independent and can be active at the same time. Internally, they are enforced using different techniques. Lower bounds are enforced at essentially zero cost. The retention condition is more expensive, but more general.

For values, only one of the two mechanisms can be enabled for any given stream. Whenever a retainment condition is specified it supersedes any lower bounds constraints.

§Arguments
  • bounds_stream - This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps in self or some other stream, computed with the help of the waterline operator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams.

  • retain_key_func - given the value received from the bounds_stream at the last clock cycle and a key, returns true if the key should be retained in the trace and false if it should be discarded.

§Correctness
  • As discussed above, the retainment policy set using this method applies to all consumers of the trace. An incorrect policy may reclaim keys that are still needed by some of the operators, leading to incorrect results. Computing a correct retainment policy can be a subtle and error prone task, which is probably best left to automatic tools like compilers.

  • The retainment policy set using this method only applies to self, but not any stream derived from it. In particular, if self is re-sharded using the shard operator, then it may be necessary to call integrate_trace_retain_keys on the resulting stream. In general, computing a correct retainment policy requires keep track of

    • Streams that are sharded by construction and hence the shard operator is a no-op for such streams. For instance, the add_input_set and aggregate operators produce sharded streams.
    • Operators that shard their input streams, e.g., join.
  • This method should be invoked at most once for a stream.

  • retain_key_func must be monotone in its first argument: for any timestamp ts1 and key k such that retain_key_func(ts1, k) = false, and for any ts2 >= ts1 it must hold that retain_key_func(ts2, k) = false, i.e., once a key is rejected, it will remain rejected as the bound increases.

Source

pub fn accumulate_integrate_trace_retain_values<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Similar to accumulate_integrate_trace_retain_keys, but applies a retainment policy to values in the trace.

Source

pub fn accumulate_integrate_trace_retain_values_last_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Applies a retainment policy that keeps all values above the threshold in bounds_stream and up to n latest values before the threshold.

Notifies the garbage collector that it should preserve all values that satisfy the predicate and the last n values before the first value that satisfies the predicate for each key. If no value associated with a key satisfies the predicate, the last n values are preserved.

Used to garbage collect streams that need to preserve a fixed number of values below a waterline, regardless of how far in the past they are. Examples include the right-hand side of an asof join and inputs to top-k operators.

IMPORTANT: this method assumes that for each key in self, values are sorted in such a way that once the retain_value_func predicate is satisfied for a value, it is also satisfied for all subsequent values.

§Arguments
  • bounds_stream - This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps in self or some other stream, computed with the help of the waterline operator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams.

  • retain_value_func - given the value received from the bounds_stream at the last clock cycle and a value, returns true if the value should be retained in the trace and false if it should be discarded.

  • n - the number of values to preserve.

Source

pub fn accumulate_integrate_trace_retain_values_top_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Applies a retainment policy that keeps all values above the threshold in bounds_stream and up to n largest values below the threshold.

This is similar to accumulate_integrate_trace_retain_values_last_n, but it does not assume that values in the group are sorted according to a timestamp.

Can be used to GC the MAX aggregate or top-k group transformers.

§Arguments
  • bounds_stream - This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps in self or some other stream, computed with the help of the waterline operator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams.

  • retain_value_func - given the value received from the bounds_stream at the last clock cycle and a value, returns true if the value should be retained in the trace and false if it should be discarded.

  • n - the number of values to preserve.

Source

pub fn accumulate_integrate_trace_retain_values_bottom_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Similar to accumulate_integrate_trace_retain_values_top_n, but keeps the bottom n values.

Source

pub fn accumulate_integrate_trace(&self) -> Stream<C, Spine<B>>

Constructs and returns a untimed trace of this stream.

The trace is unbounded, meaning that data will not be discarded because it has a low key or value. Filter functions set with accumulate_integrate_trace_retain_keys or accumulate_integrate_trace_retain_values can still discard data.

The result batch is stored durably for fault tolerance.

Source

pub fn accumulate_integrate_trace_with_bound( &self, lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>, lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>, ) -> Stream<C, Spine<B>>
where Spine<B>: SizeOf,

Constructs and returns a untimed trace of this stream.

Data in the trace with a key less than lower_key_bound or value less than lower_val_bound can be discarded, although these bounds can be lowered later (discarding less data). Filter functions set with accumulate_integrate_trace_retain_keys or accumulate_integrate_trace_retain_values can still discard data.

The result batch is stored durably for fault tolerance.

Updates the output trace once per transaction, on flush.

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit,

Source

pub fn stream_aggregate<A>( &self, aggregator: A, ) -> Stream<C, OrdIndexedZSet<Z::Key, A::Output>>
where Z: IndexedZSet<DynK = DynData>, Z::InnerBatch: Send, A: Aggregator<Z::Val, (), ZWeight>,

Aggregate values associated with each key in an indexed Z-set.

An indexed Z-set IndexedZSet<K, V, R> maps each key into a set of (value, weight) tuples (V, R). These tuples form a nested Z-set ZSet<V, R>. This method applies aggregator to each such Z-set and adds it to the output indexed Z-set with weight +1.

Min, Max, and Fold are provided as example Aggregators.

Source

pub fn stream_aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>
where Z: Batch<Time = ()>, Z::InnerBatch: Send, A: Aggregator<Z::Val, (), Z::R>, A::Output: Erase<O::DynV>, O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A::Output>,

Like Self::stream_aggregate, but can return any batch type.

Source

pub fn stream_aggregate_linear<F, A>( &self, f: F, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where Z: IndexedZSet<DynK = DynData>, A: DBWeight + MulByRef<ZWeight, Output = A>, F: Fn(&Z::Val) -> A + Clone + 'static,

A version of Self::dyn_stream_aggregate optimized for linear aggregation functions.

This method only works for linear aggregation functions f, i.e., functions that satisfy f(a+b) = f(a) + f(b), where the first “+” is the zset union of zsets composed of tuples a and b. This function will will produce incorrect results if f is not linear. The input stream is ZSet of (key, value) pairs, but the function only receives the “value” part as an input.

Source

pub fn stream_aggregate_linear_generic<F, A, O>(&self, f: F) -> Stream<C, O>
where Z: IndexedZSet, O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A, DynV = DynData>, A: DBWeight + MulByRef<ZWeight, Output = A>, F: Fn(&Z::Val) -> A + Clone + 'static,

Like Self::stream_aggregate_linear, but can return any batch type.

Source

pub fn aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>
where Z: Batch<Time = ()> + Debug, Z::InnerBatch: Send, A: Aggregator<Z::Val, <C as WithClock>::Time, Z::R>, O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = DynData>, A::Output: Erase<O::DynV>, <Z::Key as Deserializable>::ArchivedDeser: Ord,

Like Self::dyn_aggregate, but can return any batch type.

Source

pub fn aggregate_linear<F, A>( &self, f: F, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where Z: IndexedZSet<DynK = DynData>, A: DBWeight + MulByRef<ZWeight, Output = A>, F: Fn(&Z::Val) -> A + Clone + 'static, <Z::Key as Deserializable>::ArchivedDeser: Ord,

A version of Self::aggregate optimized for linear aggregation functions.

This method only works for linear aggregation functions f, i.e., functions that satisfy f(a+b) = f(a) + f(b), where the first “+” is zset union of the zsets composed of tuples a and b. This function will produce incorrect results if f is not linear. The input of aggregate_linear is an indexed Zset, but the function f is only applied to the values, ignoring the keys.

Source

pub fn weigh<F, T>(&self, f: F) -> Stream<C, OrdWSet<Z::Key, T, DynWeight>>
where Z: IndexedZSet<DynK = DynData>, F: Fn(&Z::Key, &Z::Val) -> T + 'static, T: DBWeight + MulByRef<ZWeight, Output = T>,

Convert indexed Z-set Z into a Z-set where the weight of each key is computed as:

   __
   ╲
   ╱ f(k,v) * w
   ‾‾
(k,v,w) ∈ Z

Discards the values from the input.

This is a linear operator.

Source

pub fn weigh_generic<F, T, O>(&self, f: F) -> Stream<C, O>
where Z: IndexedZSet, F: Fn(&Z::Key, &Z::Val) -> T + 'static, O: Batch<Key = Z::Key, DynK = Z::DynK, Val = (), DynV = DynUnit, Time = (), DynR = DynWeight>, T: DBWeight + MulByRef<ZWeight, Output = T>,

Like Self::weigh, but can return any batch type.

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn aggregate_linear_postprocess_retain_keys<F, A, OF, OV, TS, RK>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, TS: DBData, RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Like aggregate_linear_postprocess, but additionally applies waterline to the internal integral

See aggregate_linear_retain_keys for details.

Source

pub fn aggregate_linear_postprocess_retain_keys_persistent<F, A, OF, OV, TS, RK>( &self, persistent_id: Option<&str>, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, TS: DBData, RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Source

pub fn aggregate_linear_retain_keys<F, A, TS, RK>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>, retain_key_func: RK, f: F, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, TS: DBData, RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static, F: Fn(&V) -> A + Clone + 'static,

Like aggregate_linear, but additionally applies waterline to the internal integral

The linear aggregate operator internally invokes the regular aggregation operator, which creates two integrals: for input and output streams. The latter stream is returned as output of this operator and can be GC’d using regular means (integrate_trace_retain_keys), but the former integral is internal to this operator. When aggregating a stream that has a waterline, use this function to GC keys in the internal stream that fall below the waterline.

Source§

impl<K1, V1> Stream<RootCircuit, OrdIndexedZSet<K1, V1>>
where K1: DBData, V1: DBData,

Source

pub fn asof_join<TS, F, TSF1, TSF2, V2, V>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K1, V2>>, join: F, ts_func1: TSF1, ts_func2: TSF2, ) -> Stream<RootCircuit, OrdZSet<V>>
where TS: DBData, V2: DBData, V: DBData, F: Fn(&K1, &V1, Option<&V2>) -> V + Clone + 'static, TSF1: Fn(&V1) -> TS + Clone + 'static, TSF2: Fn(&V2) -> TS + 'static,

Asof-join operator.

An asof-join operator combines records from two tables based on a common key (similar to an equi-join), as well as a timestamp. It assumes that both tables contain a timestamp column (ts). It matches each value v in self with the value in other that has the same key and the largest timestamp not exceeding v.ts. If there are multiple values with the same timestamp, the operator picks the largest one based on the ordering (according to Ord) on type V2. If there is no value v2, such that v2.ts <= v.ts in other, then the value None is used, i.e., this operator behaves as a left join.

The operator assumes that values in both collections are sorted by timestamp, i.e., impl Ord for V1 must satisfy ts_func1(v) < ts_func1(u) ==> v < u. Similarly for V2: ts_func2(v) < ts_func2(u) ==> v < u.

§Arguments
  • self - the left-hand side of the join.
  • other - the right-hand side of the join.
  • join - join function that maps a key, a value from self, and an optional value from other to an output value.
  • ts_func1 - extracts the value of the timestamp column from a record in self.
  • ts_func2 - extracts the value of the timestamp column from a record in other.
Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: IndexedZSet<DynK = DynData>,

Source

pub fn average<A, F>(&self, f: F) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where A: DBData + From<ZWeight> + MulByRef<ZWeight, Output = A> + Div<Output = A> + GroupValue, F: Fn(&Z::Val) -> A + Clone + 'static, <Z::Key as Deserializable>::ArchivedDeser: Ord,

Incremental average aggregate.

This operator is a specialization of Stream::aggregate that for each key k in the input indexed Z-set computes the average value as:

   __                __
   ╲                 ╲
   ╱ v * w     /     ╱  w
   ‾‾                ‾‾
  (v,w) ∈ Z[k]      (v,w) ∈ Z[k]
§Design

Average is a quasi-linear aggregate, meaning that it can be efficiently computed as a composition of two linear aggregates: sum and count. The (sum, count) pair with pair-wise operations is also a linear aggregate and can be computed with a single Stream::aggregate_linear operator. The actual average is computed by applying the (sum, count) -> sum / count transformation to its output.

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn chain_aggregate<A, FInit, FUpdate>( &self, finit: FInit, fupdate: FUpdate, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where A: DBData, FInit: Fn(&V, ZWeight) -> A + 'static, FUpdate: Fn(A, &V, ZWeight) -> A + 'static,

Aggregate whose value depends on the previous value of the aggregate and changes to the input collection.

Unlike general aggregates, such aggregates don’t require storing the integral of the input collection and hence require only O(1) memory per key.

Examples include min and max over append-only collections.

§Arguments
  • finit - returns the initial value of the aggregate.
  • fupdate - updates the aggregate given its previous value and a new element of the group.
Source

pub fn chain_aggregate_persistent<A, FInit, FUpdate>( &self, persistent_id: Option<&str>, finit: FInit, fupdate: FUpdate, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where A: DBData, FInit: Fn(&V, ZWeight) -> A + 'static, FUpdate: Fn(A, &V, ZWeight) -> A + 'static,

Source§

impl<C, T> Stream<C, T>
where C: Circuit, T: Trace<Time = ()> + Clone, T::InnerTrace: Clone,

Source

pub fn consolidate( &self, ) -> Stream<C, TypedBatch<T::Key, T::Val, T::R, <T::InnerTrace as DynTrace>::Batch>>

Consolidate a trace into a single batch.

Each element in the input streams is a trace, consisting of multiple batches of updates. This operator consolidates the trace into a single batch, which uses less memory and can be handled more efficiently by most operators than the trace.

This operator is typically attached to the output of a nested circuit computed as the sum of deltas across all iterations of the circuit. Once the iteration has converged (e.g., reaching a fixed point) is a good time to consolidate the output.

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: IndexedZSet,

Source

pub fn controlled_key_filter<T, E, F, RF>( &self, threshold: &Stream<C, TypedBox<T, DynData>>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
where E: DBData, T: DBData, F: Fn(&T, &Z::Key) -> bool + 'static, RF: Fn(&T, &Z::Key, &Z::Val, ZWeight) -> E + 'static,

A controlled filter operator that discards input keys by comparing them to a scalar value in the threshold stream.

This operator compares each key in the input indexed Z-set against the current value in the threshold stream using filter_func and discards keys that don’t pass the filter.

Returns a pair of output streams.

Filtered output stream: (key, value, weight) tuples whose key passes the filter are sent to the first output stream unmodified.

Error stream: Keys that don’t pass the check are transformed by report_func and sent to the second output stream. More precisely report_func is invoked for each (key, value, weight) tuple whose key does not pass the filter and returns a value of type E, which is sent with weight 1 to the error stream.

Source

pub fn controlled_value_filter<T, E, F, RF>( &self, threshold: &Stream<C, TypedBox<T, DynData>>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
where E: DBData, T: DBData, F: Fn(&T, &Z::Key, &Z::Val) -> bool + 'static, RF: Fn(&T, &Z::Key, &Z::Val, ZWeight) -> E + 'static,

A controlled filter operator that discards input key/values pairs by comparing them to a scalar value in the threshold stream.

This operator compares each key/value pair in the input indexed Z-set against the current value in the threshold stream using filter_func and discards tuples that don’t pass the filter.

Returns a pair of output streams.

Filtered output stream: (key, value, weight) tuples that pass the check are sent to the first output stream unmodified.

Error stream: Tuples that don’t pass the check are transformed by report_func and sent to the second output stream. More precisely report_func is invoked for each (key, value, weight) tuple whose key and value do not pass the filter and returns a value of type E, which is sent with weight 1 to the error stream.

Source

pub fn controlled_value_filter_typed<T, E, F, RF>( &self, threshold: &Stream<C, T>, filter_func: F, report_func: RF, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
where E: DBData, T: DBData, F: Fn(&T, &Z::Key, &Z::Val) -> bool + 'static, RF: Fn(&T, &Z::Key, &Z::Val, ZWeight) -> E + 'static,

Like Self::controlled_value_filter, but values in the threshold stream are strongly typed instead of having type TypedBox.

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: IndexedZSet, Z::InnerBatch: Send,

Source

pub fn stream_distinct(&self) -> Stream<C, Z>

Reduces input batches to one occurrence of each element.

For each input batch B, the operator produces an output batch that contains at most one occurrence of each tuple in B. Specifically, for each input tuple (key, value, weight) with weight > 0 the operator produces an output tuple (key, value, 1). Tuples with weight <= 0 are dropped.

Intuitively, the operator converts the input multiset into a set by eliminating duplicates.

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Clone + Send + Sync + 'static,

Source

pub fn dyn_accumulate_trace( &self, output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
where B: Batch<Time = ()>,

Source

pub fn dyn_accumulate_trace_with_bound( &self, output_factories: &<Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>> as BatchReader>::Factories, batch_factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<<<C as WithClock>::Time as Timestamp>::TimedBatch<B>>>
where B: Batch<Time = ()>,

Source

pub fn dyn_accumulate_integrate_trace_retain_keys<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_accumulate_integrate_trace_retain_values<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_accumulate_integrate_trace_retain_values_last_n<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_accumulate_integrate_trace_retain_values_top_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_accumulate_integrate_trace_retain_values_bottom_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_accumulate_integrate_trace( &self, factories: &B::Factories, ) -> Stream<C, Spine<B>>
where B: Batch<Time = ()>, Spine<B>: SizeOf,

Source

pub fn dyn_accumulate_integrate_trace_with_bound( &self, factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<B>>
where B: Batch<Time = ()>, Spine<B>: SizeOf,

Source§

impl<C, B> Stream<C, Spine<B>>
where C: Circuit, B: Batch,

Source

pub fn accumulate_delay_trace(&self) -> Stream<C, SpineSnapshot<B>>

Returns the trace of self delayed by one clock cycle.

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch,

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: Clone + 'static,

Source

pub fn dyn_average<A, W>( &self, persistent_id: Option<&str>, factories: &AvgFactories<Z, A, W, C::Time>, f: Box<dyn Fn(&Z::Key, &Z::Val, &DynZWeight, &mut W)>, out_func: Box<dyn AggOutputFunc<W, A>>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where A: DataTrait + ?Sized, W: DataTrait + ?Sized, Z: IndexedZSet,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_chain_aggregate_mono( &self, persistent_id: Option<&str>, input_factories: &<MonoIndexedZSet as BatchReader>::Factories, output_factories: &<MonoIndexedZSet as BatchReader>::Factories, finit: Box<dyn Fn(&mut DynData, &DynData, ZWeight)>, fupdate: Box<dyn Fn(&mut DynData, &DynData, ZWeight)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<Z> Stream<RootCircuit, Z>
where Z: IndexedZSet,

Source

pub fn dyn_chain_aggregate<OZ>( &self, persistent_id: Option<&str>, input_factories: &Z::Factories, output_factories: &OZ::Factories, finit: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>, fupdate: Box<dyn Fn(&mut OZ::Val, &Z::Val, ZWeight)>, ) -> Stream<RootCircuit, OZ>
where OZ: IndexedZSet<Key = Z::Key>,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: Clone + 'static,

Source

pub fn dyn_stream_aggregate<Acc, Out>( &self, factories: &StreamAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
where Z: IndexedZSet, Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized,

Source

pub fn dyn_stream_aggregate_generic<Acc, Out, O>( &self, factories: &StreamAggregateFactories<Z, O>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
where Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, Z: Batch<Time = ()>, O: IndexedZSet<Key = Z::Key, Val = Out>,

Like Self::dyn_stream_aggregate, but can return any batch type.

Source

pub fn dyn_stream_aggregate_linear<A>( &self, factories: &StreamLinearAggregateFactories<Z, A, OrdIndexedZSet<Z::Key, A>>, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where Z: IndexedZSet, A: WeightTrait + ?Sized,

Source

pub fn dyn_stream_aggregate_linear_generic<A, O>( &self, factories: &StreamLinearAggregateFactories<Z, A, O>, agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>, ) -> Stream<C, O>
where Z: IndexedZSet, O: IndexedZSet<Key = Z::Key>, A: WeightTrait + ?Sized,

Like Self::dyn_stream_aggregate_linear, but can return any batch type.

Source

pub fn dyn_aggregate<Acc, Out>( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<Z, OrdIndexedZSet<Z::Key, Out>, C::Time>, aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Out>>
where Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, Z: IndexedZSet,

Source

pub fn dyn_aggregate_generic<Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateFactories<Z, O, C::Time>, aggregator: &dyn DynAggregator<Z::Val, <C as WithClock>::Time, Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
where Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, Z: Batch<Time = ()>, O: IndexedZSet<Key = Z::Key, Val = Out>,

Like Self::dyn_aggregate, but can return any batch type.

Source

pub fn dyn_aggregate_linear<A>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, A, OrdIndexedZSet<Z::Key, A>, C::Time>, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, ) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where Z: IndexedZSet, A: WeightTrait + ?Sized,

Source

pub fn dyn_aggregate_linear_generic<A, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, A, O, C::Time>, agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>, ) -> Stream<C, O>
where Z: IndexedZSet, O: IndexedZSet<Key = Z::Key>, A: WeightTrait + ?Sized,

Like Self::dyn_aggregate_linear, but can return any batch type.

Source

pub fn dyn_weigh<T>( &self, output_factories: &OrdWSetFactories<Z::Key, T>, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut T)>, ) -> Stream<C, OrdWSet<Z::Key, T>>
where T: WeightTrait + ?Sized, Z: IndexedZSet,

Source

pub fn dyn_weigh_generic<O>( &self, output_factories: &O::Factories, f: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut O::R)>, ) -> Stream<C, O>
where Z: IndexedZSet, O: Batch<Key = Z::Key, Val = DynUnit, Time = ()>,

Like Self::dyn_weigh, but can return any batch type.

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<Z> Stream<RootCircuit, Z>
where Z: Clone + 'static,

Source

pub fn dyn_aggregate_linear_retain_keys_generic<A, O, TS>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, A, O, ()>, waterline: &Stream<RootCircuit, Box<TS>>, retain_key_func: Box<dyn Fn(&TS) -> Filter<Z::Key>>, agg_func: Box<dyn Fn(&Z::Key, &Z::Val, &Z::R, &mut A)>, out_func: Box<dyn WeightedCountOutFunc<A, O::Val>>, ) -> Stream<RootCircuit, O>
where Z: IndexedZSet<Time = ()>, O: IndexedZSet<Key = Z::Key>, A: WeightTrait + ?Sized, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_asof_join_mono( &self, factories: &AsofJoinFactories<DynData, MonoIndexedZSet, MonoIndexedZSet, MonoZSet>, other: &Stream<RootCircuit, MonoIndexedZSet>, ts_func1: Box<dyn Fn(&DynData, &mut DynData)>, tscmp_func: Box<dyn Fn(&DynData, &DynData) -> Ordering>, valts_cmp_func: Box<dyn Fn(&DynData, &DynData) -> Ordering>, join_func: Box<AsofJoinFunc<DynData, DynData, DynData, DynData, DynUnit>>, ) -> Stream<RootCircuit, MonoZSet>

Source§

impl<I1> Stream<RootCircuit, I1>
where I1: IndexedZSet + Send,

Source

pub fn dyn_asof_join<TS, I2, V>( &self, factories: &AsofJoinFactories<TS, I1, I2, OrdZSet<V>>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>>, ) -> Stream<RootCircuit, OrdZSet<V>>
where TS: DataTrait + ?Sized, I2: IndexedZSet<Key = I1::Key>, V: DataTrait + ?Sized,

Source

pub fn dyn_asof_join_index<TS, I2, K, V>( &self, factories: &AsofJoinFactories<TS, I1, I2, OrdIndexedZSet<K, V>>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, K, V>>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where TS: DataTrait + ?Sized, I2: IndexedZSet<Key = I1::Key>, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_asof_join_generic<TS, I2, Z>( &self, factories: &AsofJoinFactories<TS, I1, I2, Z>, other: &Stream<RootCircuit, I2>, ts_func1: Box<dyn Fn(&I1::Val, &mut TS)>, tscmp_func: Box<dyn Fn(&I1::Val, &I2::Val) -> Ordering>, valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>, join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>>, ) -> Stream<RootCircuit, Z>
where TS: DataTrait + ?Sized, I2: IndexedZSet<Key = I1::Key>, Z: IndexedZSet,

Like Self::dyn_asof_join_index, but can return any indexed Z-set type.

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: IndexedZSet,

Source

pub fn dyn_accumulate_trace_balanced( &self, trace_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> (Stream<C, Option<Spine<B>>>, Stream<C, TimedSpine<B, C>>)

Shard and integrate self using dynamic sharding policy selected by the balancer.

Combines exchange, accumulator, and integral into a single operator. In steady state, applies the selected sharding policy to the input stream. When the sharding policy changes, retracts the previous content of the integral and re-distributes it using the new policy.

§Returns

The output of the accumulator and the integral.

§Circuit
                                     │
                                     │self
                                     ▼
                        ┌─────────────────────────┐
             ┌─────────►│RebalancingExchangeSender│◄───┐
             │          └────────────┬────────────┘    │
             │                       .                 │delayed_acc
             │                       .                 |
             │                       ▼                 │
             │               ┌────────────────┐      ┌──┐
             │               │ExchangeReceiver│      │Z1│
             │               └────────────────┘      └──┘
             │                       │                 ▲
             │                       │                 │
             │                       ▼                 │
             │             ┌──────────────────────┐    │
             │             │RebalancingAccumulator│────┘
delayed_trace│             └──────────────────────┘
             │                       │
             │                       │────────────────────┐
             ├──────────────────┐    │                    │
             │                  ▼    ▼                    │
     ┌───────┴─────────┐    ┌─────────────────────┐       │
     │AccumulateZ1Trace│◄───┤AccumulateTraceAppend│       │
     └─────────────────┘    └────────┬────────────┘       │
                                     │                    │
                                     │trace               │accumulator_stream
                                     │                    │
                                     ▼                    ▼

The two feedback streams (delayed_acc and delayed_trace) are used by RebalancingExchangeSender to retract previously accumulated content of the integral when the sharding policy changes.

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch,

Source

pub fn dyn_accumulate_with_feedback_stream( &self, factories: &B::Factories, ) -> (Stream<C, Option<Spine<B>>>, Rc<RefCell<RebalancingAccumulatorInner<B>>>, RefStreamValue<EmptyCheckpoint<Vec<Arc<B>>>>)

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Send + 'static,

Source

pub fn dyn_gather( &self, factories: &B::Factories, receiver_worker: usize, ) -> Stream<C, B>
where B: Batch<Time = ()> + Send,

Source§

impl<C, IB> Stream<C, IB>
where C: Circuit, IB: BatchReader<Time = ()> + Clone,

Source

pub fn dyn_shard(&self, factories: &IB::Factories) -> Stream<C, IB>
where IB: Batch + Send,

Source

pub fn dyn_shard_workers( &self, workers: Range<usize>, factories: &IB::Factories, ) -> Stream<C, IB>
where IB: Batch + Send,

Source

pub fn dyn_shard_generic<OB>( &self, factories: &OB::Factories, ) -> Option<Stream<C, OB>>
where OB: Batch<Key = IB::Key, Val = IB::Val, Time = (), R = IB::R> + Send,

Like Self::dyn_shard, but can assemble the results into any output batch type OB.

Returns None when the circuit is not running inside a multithreaded runtime or is running in a runtime with a single worker thread.

Source

pub fn dyn_shard_generic_workers<OB>( &self, workers: Range<usize>, factories: &OB::Factories, ) -> Option<Stream<C, OB>>
where OB: Batch<Key = IB::Key, Val = IB::Val, Time = (), R = IB::R> + Send,

Like Self::dyn_shard, but can assemble the results into any output batch type OB.

Returns None when the circuit is not running inside a multithreaded runtime or is running in a runtime with a single worker thread.

Source§

impl<C, K, V> Stream<C, Vec<Box<DynPairs<K, V>>>>
where C: Circuit, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_shard_pairs( &self, pairs_factory: &'static dyn Factory<DynPairs<K, V>>, ) -> Stream<C, Vec<Box<DynPairs<K, V>>>>

Source§

impl<C, T> Stream<C, T>
where C: Circuit, T: 'static,

Source

pub fn mark_sharded(&self) -> Self

Marks the data within the current stream as sharded, meaning that all further calls to .shard() will have no effect.

This must only be used on streams of values that are properly sharded across workers, otherwise this will cause the dataflow to yield incorrect results

Source

pub fn has_sharded_version(&self) -> bool

Returns true if a sharded version of the current stream exists

Source

pub fn try_sharded_version(&self) -> Self

Returns the sharded version of the stream if it exists (which may be the stream itself or the result of applying the shard operator to it). Otherwise, returns self.

Source

pub fn try_unsharded_version(&self) -> Self

Returns the unsharded version of the stream if it exists, and otherwise self.

Source

pub fn is_sharded(&self) -> bool

Returns true if this stream is sharded.

Source

pub fn mark_sharded_if<C2, U>(&self, input: &Stream<C2, U>)
where C2: Circuit, U: 'static,

Marks self as sharded if input has a sharded version of itself

Source§

impl<C, T> Stream<C, T>
where C: Circuit, T: Trace<Time = ()> + Clone,

Source

pub fn dyn_consolidate( &self, factories: &<T::Batch as BatchReader>::Factories, ) -> Stream<C, T::Batch>

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<RootCircuit, MonoZSet>

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: ZBatch<Time = ()>,

Source

pub fn dyn_controlled_key_filter<T, E>( &self, factories: ControlledFilterFactories<Z, E>, threshold: &Stream<C, Box<T>>, filter_func: Box<dyn Fn(&T, &Z::Key) -> bool>, report_func: Box<dyn Fn(&T, &Z::Key, &Z::Val, ZWeight, &mut E)>, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
where T: DataTrait + ?Sized, Box<T>: Clone, E: DataTrait + ?Sized,

Source

pub fn dyn_controlled_value_filter<T, E>( &self, factories: ControlledFilterFactories<Z, E>, threshold: &Stream<C, Box<T>>, filter_func: Box<dyn Fn(&T, &Z::Key, &Z::Val) -> bool>, report_func: Box<dyn Fn(&T, &Z::Key, &Z::Val, ZWeight, &mut E)>, ) -> (Stream<C, Z>, Stream<C, OrdZSet<E>>)
where T: DataTrait + ?Sized, Box<T>: Clone, E: DataTrait + ?Sized,

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: IndexedZSet,

Source

pub fn dyn_weighted_count( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>, C::Time>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>

Source

pub fn dyn_weighted_count_generic<A, O>( &self, persistent_id: Option<&str>, factories: &IncAggregateLinearFactories<Z, Z::R, O, C::Time>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
where O: IndexedZSet<Key = Z::Key, Val = A>, A: DataTrait + ?Sized,

Like Self::dyn_weighted_count, but can return any batch type.

Source

pub fn dyn_distinct_count( &self, persistent_id: Option<&str>, factories: &DistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>, C::Time>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
where Z: Send,

Source

pub fn dyn_distinct_count_generic<A, O>( &self, persistent_id: Option<&str>, factories: &DistinctCountFactories<Z, O, C::Time>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
where A: DataTrait + ?Sized, O: IndexedZSet<Key = Z::Key, Val = A>, Z: Send,

Like Self::dyn_distinct_count, but can return any batch type.

Source

pub fn dyn_stream_weighted_count( &self, factories: &StreamLinearAggregateFactories<Z, Z::R, OrdIndexedZSet<Z::Key, Z::R>>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>

Source

pub fn dyn_stream_weighted_count_generic<A, O>( &self, factories: &StreamLinearAggregateFactories<Z, Z::R, O>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
where A: DataTrait + ?Sized, O: IndexedZSet<Key = Z::Key, Val = A>,

Like Self::dyn_stream_weighted_count, but can return any batch type.

Source

pub fn dyn_stream_distinct_count( &self, factories: &StreamDistinctCountFactories<Z, OrdIndexedZSet<Z::Key, Z::R>>, ) -> Stream<C, OrdIndexedZSet<Z::Key, Z::R>>
where Z: Send,

Source

pub fn dyn_stream_distinct_count_generic<A, O>( &self, factories: &StreamDistinctCountFactories<Z, O>, out_func: Box<dyn WeightedCountOutFunc<Z::R, A>>, ) -> Stream<C, O>
where A: DataTrait + ?Sized, O: IndexedZSet<Key = Z::Key, Val = A>, Z: Send,

Like Self::dyn_distinct_count, but can return any batch type.

Source§

impl<C, D> Stream<C, D>
where C: Circuit, D: 'static,

Source

pub fn mark_distinct(&self) -> Self

Marks the data within the current stream as distinct, meaning that all further calls to .distinct() will have no effect.

This must only be used on streams whose integral contain elements with unit weights only, otherwise this will cause the dataflow to yield incorrect results

Source

pub fn has_distinct_version(&self) -> bool

Returns true if a distinct version of the current stream exists

Source

pub fn is_distinct(&self) -> bool

Returns true if the current stream is known to be distinct.

Source

pub fn try_distinct_version(&self) -> Self

Returns the distinct version of the stream if it exists Otherwise, returns self.

Source

pub fn mark_distinct_if<C2, D2>(&self, input: &Stream<C2, D2>)
where C2: Circuit, D2: 'static,

Marks self as distinct if input has a distinct version of itself

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<RootCircuit, MonoZSet>

Source§

impl Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoZSet>

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit,

Source

pub fn dyn_stream_distinct( &self, input_factories: &Z::Factories, ) -> Stream<C, Z>
where Z: IndexedZSet + Send,

Source

pub fn dyn_hash_distinct( &self, factories: &HashDistinctFactories<Z, C::Time>, ) -> Stream<C, Z>

Source

pub fn dyn_distinct( &self, factories: &DistinctFactories<Z, C::Time>, ) -> Stream<C, Z>
where Z: IndexedZSet + Send,

Source

pub fn dyn_distinct_inner( &self, factories: &DistinctFactories<Z, C::Time>, ) -> Stream<C, Z>
where Z: IndexedZSet + Send,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn((&DynData, &DynData)) -> bool>, ) -> Self

Source

pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynUnit>)>, ) -> Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynData>)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn(&DynData) -> bool>, ) -> Self

Source

pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynUnit>)>, ) -> Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynData>)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoIndexedZSet>

Source

pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn((&DynData, &DynData)) -> bool>, ) -> Self

Source

pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynUnit>)>, ) -> Stream<NestedCircuit, MonoZSet>

Source

pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn((&DynData, &DynData), &mut DynPair<DynData, DynData>)>, ) -> Stream<NestedCircuit, MonoIndexedZSet>

Source

pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<NestedCircuit, MonoZSet>

Source

pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut((&DynData, &DynData), &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoZSet>

Source

pub fn dyn_filter_mono( &self, filter_func: Box<dyn Fn(&DynData) -> bool>, ) -> Self

Source

pub fn dyn_map_mono( &self, output_factories: &MonoZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynUnit>)>, ) -> Stream<NestedCircuit, MonoZSet>

Source

pub fn dyn_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, map_func: Box<dyn Fn(&DynData, &mut DynPair<DynData, DynData>)>, ) -> Stream<NestedCircuit, MonoIndexedZSet>

Source

pub fn dyn_flat_map_mono( &self, output_factories: &MonoZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynUnit))>, ) -> Stream<NestedCircuit, MonoZSet>

Source

pub fn dyn_flat_map_index_mono( &self, output_factories: &MonoIndexedZSetFactories, func: Box<dyn FnMut(&DynData, &mut dyn FnMut(&mut DynData, &mut DynData))>, ) -> Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl<C: Circuit, B: DynFilterMap> Stream<C, B>

Source

pub fn dyn_filter( &self, filter_func: Box<dyn Fn(B::DynItemRef<'_>) -> bool>, ) -> Self

Source

pub fn dyn_map<K: DataTrait + ?Sized>( &self, output_factories: &OrdWSetFactories<K, B::R>, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, DynUnit>)>, ) -> Stream<C, OrdWSet<K, B::R>>

Source

pub fn dyn_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>( &self, output_factories: &OrdIndexedWSetFactories<K, V, B::R>, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<K, V>)>, ) -> Stream<C, OrdIndexedWSet<K, V, B::R>>

Behaves as Self::dyn_map followed by index, but is more efficient. Assembles output records into VecIndexedZSet batches.

Source

pub fn dyn_map_generic<O>( &self, output_factories: &O::Factories, map_func: Box<dyn Fn(B::DynItemRef<'_>, &mut DynPair<O::Key, O::Val>)>, ) -> Stream<C, O>
where O: Batch<Time = (), R = B::R>,

Like Self::dyn_map_index, but can return any batch type.

Source

pub fn dyn_flat_map<K: DataTrait + ?Sized>( &self, output_factories: &OrdWSetFactories<K, B::R>, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut DynUnit))>, ) -> Stream<C, OrdWSet<K, B::R>>

Source

pub fn dyn_flat_map_index<K: DataTrait + ?Sized, V: DataTrait + ?Sized>( &self, output_factories: &OrdIndexedWSetFactories<K, V, B::R>, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut K, &mut V))>, ) -> Stream<C, OrdIndexedWSet<K, V, B::R>>

Source

pub fn dyn_flat_map_generic<O>( &self, output_factories: &O::Factories, func: Box<dyn FnMut(B::DynItemRef<'_>, &mut dyn FnMut(&mut O::Key, &mut O::Val))>, ) -> Stream<C, O>
where O: Batch<Time = (), R = B::R> + Clone + 'static,

Like Self::dyn_flat_map_index, but can return any batch type.

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet + Send,

Source

pub fn dyn_lag<OV>( &self, persistent_id: Option<&str>, factories: &LagFactories<B, OV>, offset: isize, project: Box<dyn Fn(Option<&B::Val>, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, DynPair<B::Val, OV>>>
where OV: DataTrait + ?Sized,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_lag_custom_order_mono( &self, persistent_id: Option<&str>, factories: &LagCustomOrdFactories<MonoIndexedZSet, DynData, DynData, DynData>, offset: isize, encode: Box<dyn Fn(&DynData, &mut DynData)>, project: Box<dyn Fn(Option<&DynData>, &mut DynData)>, decode: Box<dyn Fn(&DynData, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<B, K, V> Stream<RootCircuit, B>
where B: IndexedZSet<Key = K, Val = V> + Send, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_lag_custom_order<V2, VL, OV>( &self, persistent_id: Option<&str>, factories: &LagCustomOrdFactories<B, V2, VL, OV>, offset: isize, encode: Box<dyn Fn(&V, &mut V2)>, project: Box<dyn Fn(Option<&V2>, &mut VL)>, decode: Box<dyn Fn(&V2, &VL, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where V2: DataTrait + ?Sized, VL: DataTrait + ?Sized, OV: DataTrait + ?Sized, B: for<'a> DynFilterMap<DynItemRef<'a> = (&'a K, &'a V)>,

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet + Send,

Source

pub fn dyn_topk_asc( &self, persistent_id: Option<&str>, factories: &TopKFactories<B>, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>

Source

pub fn dyn_topk_desc( &self, persistent_id: Option<&str>, factories: &TopKFactories<B>, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_topk_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKCustomOrdFactories<DynData, DynData, DynData, DynZWeight>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, decode: Box<dyn Fn(&DynData) -> &DynData>, ) -> Self

Source

pub fn dyn_topk_rank_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, rank_eq_func: Box<dyn Fn(&DynData, &DynData) -> bool>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_topk_dense_rank_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, rank_eq_func: Box<dyn Fn(&DynData, &DynData) -> bool>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_topk_row_number_custom_order_mono( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<DynData, DynData, DynData>, k: usize, encode: Box<dyn Fn(&DynData, &mut DynData)>, output_func: Box<dyn Fn(i64, &DynData, &mut DynData)>, ) -> Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_topk_custom_order<V2>( &self, persistent_id: Option<&str>, factories: &TopKCustomOrdFactories<K, V, V2, DynZWeight>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, decode: Box<dyn Fn(&V2) -> &V>, ) -> Self
where V2: DataTrait + ?Sized,

Source

pub fn dyn_topk_rank_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where V2: DataTrait + ?Sized, OV: DataTrait + ?Sized,

Source

pub fn dyn_topk_dense_rank_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, rank_eq_func: Box<dyn Fn(&V2, &V2) -> bool>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where V2: DataTrait + ?Sized, OV: DataTrait + ?Sized,

Source

pub fn dyn_topk_row_number_custom_order<V2, OV>( &self, persistent_id: Option<&str>, factories: &TopKRankCustomOrdFactories<K, V2, OV>, k: usize, encode: Box<dyn Fn(&V, &mut V2)>, output_func: Box<dyn Fn(i64, &V2, &mut OV)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where V2: DataTrait + ?Sized, OV: DataTrait + ?Sized,

Source§

impl<C, CI> Stream<C, CI>
where CI: Clone + 'static, C: Circuit,

Source

pub fn index<K, V>( &self, output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories, ) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized, CI: BatchReader<Key = DynPair<K, V>, Val = DynUnit, Time = ()>,

Convert input batches to an indexed representation.

Converts input batches whose key type is a (key,value) tuple into an indexed Z-set using the first element of each tuple as a key and the second element as the value. The indexed Z-set representation is used as input to various join and aggregation operators.

Source

pub fn index_generic<CO>( &self, output_factories: &CO::Factories, ) -> Stream<C, CO>
where CI: BatchReader<Key = DynPair<CO::Key, CO::Val>, Val = DynUnit, Time = (), R = CO::R>, CO: Batch<Time = ()>,

Like index, but can return any indexed Z-set type, not just OrdIndexedZSet.

Source

pub fn index_with<K, V, F>( &self, output_factories: &<OrdIndexedWSet<K, V, CI::R> as BatchReader>::Factories, index_func: F, ) -> Stream<C, OrdIndexedWSet<K, V, CI::R>>
where CI: BatchReader<Time = (), Val = DynUnit>, F: Fn(&CI::Key, &mut DynPair<K, V>) + Clone + 'static, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Convert input batches to an indexed representation with the help of a user-provided function that maps a key in the input Z-set into an output (key, value) pair.

Converts input batches into an indexed Z-set by applying index_func to each key in the input batch and using the first element of the resulting tuple as a key and the second element as the value. The indexed Z-set representation is used as input to join and aggregation operators.

Source

pub fn index_with_generic<CO, F>( &self, index_func: F, output_factories: &CO::Factories, ) -> Stream<C, CO>
where CI: BatchReader<Time = (), Val = DynUnit>, CO: Batch<Time = (), R = CI::R>, F: Fn(&CI::Key, &mut DynPair<CO::Key, CO::Val>) + Clone + 'static,

Like index_with, but can return any indexed Z-set type, not just OrdIndexedZSet.

Source§

impl<K, V, U> Stream<RootCircuit, Vec<Box<DynPairs<K, dyn UpdateTrait<V, U>>>>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized, U: DataTrait + ?Sized,

Source

pub fn input_upsert<B>( &self, persistent_id: Option<&str>, factories: &InputUpsertFactories<B, U>, patch_func: PatchFunc<V, U>, ) -> Stream<RootCircuit, B>
where B: IndexedZSet<Key = K, Val = V>,

Convert an input stream of upserts into a stream of updates to a relation.

The input stream carries changes to a key/value map in the form of upserts. An upsert assigns a new value to a key (or deletes the key from the map) without explicitly removing the old value, if any. The operator converts upserts into batches of updates, which is the input format of most DBSP operators.

The operator assumes that the input vector is sorted by key; however, unlike the Stream::upsert operator it allows the vector to contain multiple updates per key. Updates are applied one by one in order, and the output of the operator reflects cumulative effect of the updates. Additionally, unlike the Stream::upsert operator, which only supports inserts, which overwrite the entire value with a new value, and deletions, this operator also supports updates that modify the contents of a value, e.g., overwriting some of its fields. Type argument U defines the format of modifications, and the patch_func function applies update of type U to a value of type V.

This is a stateful operator that internally maintains the trace of the collection.

Source

pub fn input_upsert_with_waterline<B, W, E>( &self, persistent_id: Option<&str>, factories: &InputUpsertWithWaterlineFactories<B, U, E>, patch_func: PatchFunc<V, U>, init_waterline: Box<dyn Fn() -> Box<W>>, extract_ts: Box<dyn Fn(&B::Key, &B::Val, &mut W)>, least_upper_bound: LeastUpperBoundFunc<W>, filter_func: Box<dyn Fn(&W, &B::Key, &B::Val) -> bool>, report_func: Box<dyn Fn(&W, &B::Key, &B::Val, ZWeight, &mut E)>, ) -> (Stream<RootCircuit, B>, Stream<RootCircuit, OrdZSet<E>>, Stream<RootCircuit, Box<W>>)
where B: IndexedZSet<Key = K, Val = V>, W: DataTrait + Checkpoint + ?Sized, E: DataTrait + ?Sized, Box<W>: Checkpoint + Clone + NumEntries + Rkyv,

Source§

impl<C, I1> Stream<C, I1>
where C: Circuit,

Source

pub fn dyn_stream_join<I2, V>( &self, factories: &StreamJoinFactories<I1, I2, OrdZSet<V>>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, V, DynUnit>, ) -> Stream<C, OrdZSet<V>>
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key>, V: DataTrait + ?Sized,

Source

pub fn dyn_stream_join_generic<I2, Z>( &self, factories: &StreamJoinFactories<I1, I2, Z>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<C, Z>
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key>, Z: IndexedZSet,

Like Self::dyn_stream_join, but can return any batch type.

Source

pub fn dyn_monotonic_stream_join<I2, Z>( &self, factories: &StreamJoinFactories<I1, I2, Z>, other: &Stream<C, I2>, join: JoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, DynUnit>, ) -> Stream<C, Z>
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key>, Z: ZSet,

Source

pub fn dyn_stream_antijoin<I2>( &self, factories: &StreamAntijoinFactories<I1, OrdZSet<I2::Key>>, other: &Stream<C, I2>, ) -> Self
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key> + DynFilterMap,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl<C, I1> Stream<C, I1>
where C: Circuit, I1: IndexedZSet,

Source

pub fn dyn_join<I2, V>( &self, factories: &JoinFactories<I1, I2, C::Time, OrdZSet<V>>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, V, DynUnit>, ) -> Stream<C, OrdZSet<V>>
where I2: IndexedZSet<Key = I1::Key>, V: DataTrait + ?Sized,

Source

pub fn dyn_join_index<I2, K, V>( &self, factories: &JoinFactories<I1, I2, C::Time, OrdIndexedZSet<K, V>>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, K, V>, ) -> Stream<C, OrdIndexedZSet<K, V>>
where I2: IndexedZSet<Key = I1::Key>, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_join_generic<I2, Z>( &self, factories: &JoinFactories<I1, I2, C::Time, Z>, other: &Stream<C, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<C, Z>
where I2: IndexedZSet<Key = I1::Key>, Z: IndexedZSet,

Like Self::dyn_join_index, but can return any indexed Z-set type.

Source

pub fn dyn_antijoin<I2>( &self, factories: &AntijoinFactories<I1, OrdZSet<I2::Key>, C::Time>, other: &Stream<C, I2>, ) -> Stream<C, I1>
where I2: IndexedZSet<Key = I1::Key> + DynFilterMap + Send, Box<I1::Key>: Clone, Box<I1::Val>: Clone,

Source§

impl<I1> Stream<RootCircuit, I1>
where I1: IndexedZSet,

Source

pub fn dyn_join_generic_balanced<I2, Z>( &self, factories: &JoinFactories<I1, I2, (), Z>, other: &Stream<RootCircuit, I2>, join_funcs: TraceJoinFuncs<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
where I2: IndexedZSet<Key = I1::Key>, Z: IndexedZSet,

Source§

impl<C, Z> Stream<C, Z>
where C: Circuit, Z: IndexedZSet,

Source

pub fn dyn_outer_join<Z2, O>( &self, factories: &OuterJoinFactories<Z, Z2, C::Time, O>, other: &Stream<C, Z2>, join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>, left_func: Box<dyn FnMut(<Z as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>, right_func: Box<dyn FnMut(<Z2 as DynFilterMap>::DynItemRef<'_>, &mut dyn FnMut(&mut O, &mut DynUnit))>, ) -> Stream<C, OrdZSet<O>>
where Z: DynFilterMap + Send, Z2: IndexedZSet<Key = Z::Key> + Send + DynFilterMap, O: DataTrait + ?Sized, Box<Z::Key>: Clone, Box<Z::Val>: Clone, Box<Z2::Val>: Clone,

Source

pub fn dyn_outer_join_default<Z2, O>( &self, factories: &OuterJoinFactories<Z, Z2, C::Time, O>, other: &Stream<C, Z2>, join_funcs: TraceJoinFuncs<Z::Key, Z::Val, Z2::Val, O, DynUnit>, ) -> Stream<C, OrdZSet<O>>
where Z: for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z as BatchReader>::Key, &'a <Z as BatchReader>::Val)> + Send, Z2: IndexedZSet<Key = Z::Key> + Send + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <Z2 as BatchReader>::Key, &'a <Z2 as BatchReader>::Val)>, O: DataTrait + ?Sized, Box<Z::Key>: Clone, Box<Z::Val>: Clone, Box<Z2::Val>: Clone,

Like Stream::dyn_outer_join, but uses default value for the missing side of the join.

Source§

impl<C, I1> Stream<C, I1>
where C: Circuit,

Source

pub fn dyn_stream_join_range<I2, V>( &self, factories: &StreamJoinRangeFactories<I2, OrdZSet<V>>, other: &Stream<C, I2>, range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>, join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut V, &mut DynUnit))>, ) -> Stream<C, OrdZSet<V>>

Source

pub fn dyn_stream_join_range_index<K, V, I2>( &self, factories: &StreamJoinRangeFactories<I2, OrdIndexedZSet<K, V>>, other: &Stream<C, I2>, range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>, join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut K, &mut V))>, ) -> Stream<C, OrdIndexedZSet<K, V>>

Source

pub fn dyn_stream_join_range_generic<I2, O>( &self, factories: &StreamJoinRangeFactories<I2, O>, other: &Stream<C, I2>, range_func: Box<dyn Fn(&I1::Key, &mut I2::Key, &mut I2::Key)>, join_func: Box<dyn Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val, &mut dyn FnMut(&mut O::Key, &mut O::Val))>, ) -> Stream<C, O>

Like Self::dyn_stream_join_range, but can return any indexed Z-set type.

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl Stream<NestedCircuit, MonoIndexedZSet>

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_star_join<O>( &self, factories: &StarJoinFactories<OrdIndexedZSet<K, V>, O, ()>, others: &[(Self, bool)], join_funcs: &[StarJoinFunc<RootCircuit, OrdIndexedZSet<K, V>, O::Key, O::Val>], ) -> Stream<RootCircuit, O>
where O: IndexedZSet,

Source§

impl<C, K, V> Stream<C, OrdIndexedZSet<K, V>>
where C: Circuit, K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_inner_star_join<O>( &self, factories: &StarJoinFactories<OrdIndexedZSet<K, V>, O, C::Time>, others: &[Self], join_funcs: &[StarJoinFunc<C, OrdIndexedZSet<K, V>, O::Key, O::Val>], ) -> Stream<C, O>
where O: IndexedZSet,

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet,

Source

pub fn dyn_neighborhood( &self, factories: &NeighborhoodFactories<B>, neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>, ) -> NeighborhoodStream<B::Key, B::Val>

Returns a small contiguous range of rows (DynNeighborhood) of the input table.

This operator helps to visualize the contents of the input table in a UI. The UI client may not have enough throughput/memory to store the entire table, and will instead limit its state to a small range of rows that fit on the screen. We specify such a range, or neighborhood, in terms of its center (or “anchor”), and the number of rows preceding and following the anchor (see NeighborhoodDescr). The user may be interested in a static snapshot of the neighborhood or in a changing view. Both modes are supported by this operator (see the reset argument). The output of the operator is a stream of DynNeighborhoods.

NOTE: This operator assumes that the integral of the input stream does not contain negative weights (which should normally be the case) and may produce incorrect outputs otherwise.

§Arguments
  • self - a stream of changes to an indexed Z-set.

  • neighborhood_descr - contains the neighborhood descriptor to evaluate at every clock tick. Set to None to disable the operator (it will output an empty neighborhood).

§Output

Outputs a stream of changes to the neighborhood.

The output neighborhood will contain rows with indexes between -descr.before and descr.after - 1. Row 0 is the anchor row, i.e., is the first row in the input stream greater than or equal to descr.anchor. If there is no such row (i.e., all rows in the input stream are smaller than the anchor), then the neighborhood will only contain negative indexes.

The first index in the neighborhood may be greater than -descr.before if the input stream doesn’t contain enough rows preceding the specified anchor. The last index may be smaller than descr.after - 1 if the input stream doesn’t contain descr.after rows following the anchor point.

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
where K: DataTrait + ?Sized, V1: DataTrait + ?Sized,

Source

pub fn dyn_left_join<V2, Z>( &self, factories: &JoinFactories<OrdIndexedZSet<K, V1>, OrdIndexedZSet<K, V2>, (), Z>, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join_funcs: TraceJoinFuncs<K, V1, V2, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
where V2: DataTrait + ?Sized, Z: IndexedZSet,

Left-join self with other.

For any key on the left that is not present on the right, passes the default value of V2 to the join function. This assumes that the underlying concrete type if Option<T> and that the other stream does not contain any None values.

Source§

impl<K, V1> Stream<RootCircuit, OrdIndexedZSet<K, V1>>
where K: DataTrait + ?Sized, V1: DataTrait + ?Sized,

Source

pub fn dyn_left_join_balanced<V2, Z>( &self, factories: &JoinFactories<OrdIndexedZSet<K, V1>, OrdIndexedZSet<K, V2>, (), Z>, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join_funcs: TraceJoinFuncs<K, V1, V2, Z::Key, Z::Val>, ) -> Stream<RootCircuit, Z>
where V2: DataTrait + ?Sized, Z: IndexedZSet,

Source§

impl<B> Stream<RootCircuit, B>

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized,

Source

pub fn dyn_saturate( &self, factories: &<OrdIndexedZSet<K, V> as BatchReader>::Factories, ) -> Stream<RootCircuit, Option<SpineSnapshot<OrdIndexedZSet<K, V>>>>

Saturate the input stream by adding a ghost (k, None) tuple for each key not present in the trace of the input stream.

This is an auxiliary operator used to implement incremental outer joins. The idea is to convert, e.g., a left join into an inner join by simulating that every key in the right side of the join is always present. We do this without materializing the entire universe of keys by:

  1. Providing a modified cursor over the integral of the right side, which returns missing keys on demand (see SaturatingCursor).
  2. Augmenting the change stream with the missing records (this operator), specifically:
    • When the first value for a key is added to the collection, we inject a retraction of the ghost tuple ((k, None), -1).
    • When the last value for a key is removed from the collection, we inject an addition of the ghost tuple ((k, None), +1).

Caveat: In order to faithfully implement the saturated stream, we’d have to output ghost tuples for all missing keys during the first step. We don’t do this, relying on the fact that the left join essentially ignores the first delta in the right stream (by joining with the empty delayed integral of the left side, which is always empty in the first step, since it’s the output of a delay operator).

Source

pub fn dyn_saturate_balanced( &self, factories: &<OrdIndexedZSet<K, V> as BatchReader>::Factories, ) -> Stream<RootCircuit, Option<SpineSnapshot<OrdIndexedZSet<K, V>>>>

Source§

impl<C, Pairs> Stream<C, Pairs>
where C: Circuit,

Source

pub fn dyn_semijoin_stream<Keys, Out>( &self, factories: &SemijoinStreamFactories<Pairs, Keys, Out>, keys: &Stream<C, Keys>, ) -> Stream<C, Out>
where Pairs: IndexedZSet + Send, Keys: ZSet<Key = Pairs::Key> + Send, Out: ZSet<Key = DynPair<Pairs::Key, Pairs::Val>>,

Source§

impl<Z> Stream<RootCircuit, Z>
where Z: Clone + 'static,

Source

pub fn partitioned_tree_aggregate<TS, V, Acc, Out>( &self, persistent_id: Option<&str>, factories: &PartitionedTreeAggregateFactories<TS, V, Z, OrdIndexedZSet<<Z as BatchReader>::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>, Acc>, aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<RootCircuit, OrdIndexedZSet<Z::Key, DynPair<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>>

Given a batch of updates to a partitioned time series stream, computes a stream of updates to its partitioned radix tree.

This is a building block for higher-level operators such as Stream::partitioned_rolling_aggregate.

Source

pub fn partitioned_tree_aggregate_generic<TS, V, Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &PartitionedTreeAggregateFactories<TS, V, Z, O, Acc>, aggregator: &dyn DynAggregator<V, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<RootCircuit, O>
where Z: PartitionedIndexedZSet<DynDataTyped<TS>, V> + SizeOf + Send, Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, TS: DBData + PrimInt, V: DataTrait + ?Sized, O: PartitionedRadixTreeBatch<TS, Acc, Key = Z::Key>,

Like Self::partitioned_tree_aggregate, but can return any partitioned batch type.

This is a building block for higher-level operators such as Stream::partitioned_rolling_aggregate.

Source§

impl<C, Z, TS> Stream<C, Z>
where C: Circuit, Z: IndexedZSet<Key = DynDataTyped<TS>> + SizeOf + Send, TS: DBData + PrimInt,

Source

pub fn tree_aggregate<Acc, Out>( &self, persistent_id: Option<&str>, factories: &TreeAggregateFactories<TS, Z, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>, Acc>, aggregator: &dyn DynAggregator<Z::Val, (), Z::R, Accumulator = Acc, Output = Out>, ) -> Stream<C, OrdIndexedZSet<DynDataTyped<Prefix<TS>>, dyn TreeNodeTrait<TS, Acc>>>
where Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized,

Given a batch of updates to a time series stream, computes a stream of updates to its radix tree.

This is intended as a building block for higher-level operators.

§Limitations

Unlike Stream::partitioned_tree_aggregate(), this operator is currently not parallelized, performing all work in a single worker thread.

Source

pub fn tree_aggregate_generic<Acc, Out, O>( &self, persistent_id: Option<&str>, factories: &TreeAggregateFactories<TS, Z, O, Acc>, aggregator: &dyn DynAggregator<Z::Val, (), DynZWeight, Accumulator = Acc, Output = Out>, ) -> Stream<C, O>
where Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, O: RadixTreeBatch<TS, Acc>,

Like Self::tree_aggregate, but can return any batch type.

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet,

Source

pub fn dyn_partitioned_rolling_aggregate_with_waterline<PK, TS, V, Acc, Out>( &self, persistent_id: Option<&str>, factories: &PartitionedRollingAggregateWithWaterlineFactories<PK, TS, V, Acc, Out, B>, waterline: &Stream<RootCircuit, Box<DynDataTyped<TS>>>, partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>, aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>
where B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>, Box<B::Key>: Clone, PK: DataTrait + ?Sized, TS: DBData + UnsignedPrimInt + Erase<B::Key>, V: DataTrait + ?Sized, Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized,

Source

pub fn dyn_partitioned_rolling_aggregate<PK, TS, V, Acc, Out>( &self, persistent_id: Option<&str>, factories: &PartitionedRollingAggregateFactories<TS, V, Acc, Out, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<Out>>>, partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>, aggregator: &dyn DynAggregator<V, (), B::R, Accumulator = Acc, Output = Out>, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, Out>
where B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>, Acc: DataTrait + ?Sized, Out: DataTrait + ?Sized, PK: DataTrait + ?Sized, TS: DBData + UnsignedPrimInt + Erase<B::Key>, V: DataTrait + ?Sized,

Like Self::dyn_partitioned_rolling_aggregate, but can return any batch type.

Source

pub fn dyn_partitioned_rolling_aggregate_linear<PK, TS, V, A, O>( &self, persistent_id: Option<&str>, factories: &PartitionedRollingAggregateLinearFactories<TS, V, O, A, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<O>>>, partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>, f: Box<dyn WeighFunc<V, B::R, A>>, output_func: Box<dyn AggOutputFunc<A, O>>, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, O>
where B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>, PK: DataTrait + ?Sized, TS: DBData + UnsignedPrimInt + Erase<B::Key>, V: DataTrait + ?Sized, A: WeightTrait + ?Sized, O: DataTrait + ?Sized,

Source

pub fn dyn_partitioned_rolling_average<PK, TS, V, W>( &self, persistent_id: Option<&str>, factories: &PartitionedRollingAverageFactories<TS, V, W, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, V>, OrdPartitionedIndexedZSet<PK, DynDataTyped<TS>, DynOpt<V>>>, partition_func: Box<dyn PartitionFunc<B::Val, PK, V>>, f: Box<dyn WeighFunc<V, B::R, W>>, out_func: Box<dyn AggOutputFunc<W, V>>, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, DynDataTyped<TS>, V>
where B: IndexedZSet + for<'a> DynFilterMap<DynItemRef<'a> = (&'a <B as BatchReader>::Key, &'a <B as BatchReader>::Val)>, PK: DataTrait + ?Sized, TS: DBData + UnsignedPrimInt + Erase<B::Key>, V: DataTrait + ?Sized, W: WeightTrait + ?Sized,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source

pub fn dyn_waterline_mono( &self, persistent_id: Option<&str>, init: Box<dyn Fn() -> Box<DynData>>, extract_ts: Box<dyn Fn(&DynData, &DynData, &mut DynData)>, least_upper_bound: LeastUpperBoundFunc<DynData>, ) -> Stream<RootCircuit, Box<DynData>>

Source§

impl Stream<RootCircuit, MonoZSet>

Source

pub fn dyn_waterline_mono( &self, persistent_id: Option<&str>, init: Box<dyn Fn() -> Box<DynData>>, extract_ts: Box<dyn Fn(&DynData, &DynUnit, &mut DynData)>, least_upper_bound: LeastUpperBoundFunc<DynData>, ) -> Stream<RootCircuit, Box<DynData>>

Source§

impl<B> Stream<RootCircuit, B>
where B: BatchReader + Clone + 'static,

Source

pub fn dyn_waterline_monotonic<TS>( &self, init: Box<dyn Fn() -> Box<TS>>, waterline_func: Box<dyn Fn(&B::Key, &mut TS)>, ) -> Stream<RootCircuit, Box<TS>>
where TS: Checkpoint + DataTrait + ?Sized, Box<TS>: Clone + SizeOf + NumEntries + Rkyv,

Source§

impl<B> Stream<RootCircuit, B>
where B: BatchReader + Clone + 'static,

Source

pub fn dyn_waterline<TS>( &self, persistent_id: Option<&str>, init: Box<dyn Fn() -> Box<TS>>, extract_ts: Box<dyn Fn(&B::Key, &B::Val, &mut TS)>, least_upper_bound: LeastUpperBoundFunc<TS>, ) -> Stream<RootCircuit, Box<TS>>
where TS: Checkpoint + DataTrait + ?Sized, Box<TS>: Clone + SizeOf + NumEntries + Rkyv,

Source§

impl Stream<RootCircuit, MonoIndexedZSet>

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: IndexedZSet, Box<B::Key>: Clone,

Source

pub fn dyn_window( &self, factories: &B::Factories, inclusive: (bool, bool), bounds: &Stream<C, (Box<B::Key>, Box<B::Key>)>, ) -> Stream<C, B>

Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Clone + Send + Sync + 'static,

Source

pub fn dyn_trace( &self, output_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, ) -> Stream<C, TimedSpine<B, C>>
where B: Batch<Time = ()>,

Source

pub fn dyn_trace_with_bound( &self, output_factories: &<TimedSpine<B, C> as BatchReader>::Factories, batch_factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TimedSpine<B, C>>
where B: Batch<Time = ()>,

Source

pub fn dyn_integrate_trace_retain_keys<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_key_func: Box<dyn Fn(&TS) -> Filter<B::Key>>, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_integrate_trace_retain_values<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_integrate_trace_retain_values_last_n<TS>( &self, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_integrate_trace_retain_values_top_n<TS>( &self, val_factory: &'static dyn Factory<B::Val>, bounds_stream: &Stream<C, Box<TS>>, retain_val_func: Box<dyn Fn(&TS) -> Filter<B::Val>>, n: usize, )
where B: Batch<Time = ()>, TS: DataTrait + ?Sized, Box<TS>: Clone,

Source

pub fn dyn_integrate_trace( &self, factories: &B::Factories, ) -> Stream<C, Spine<B>>
where B: Batch<Time = ()>, Spine<B>: SizeOf,

Source

pub fn dyn_integrate_trace_with_bound( &self, factories: &B::Factories, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, Spine<B>>
where B: Batch<Time = ()>, Spine<B>: SizeOf,

Source§

impl<C, B> Stream<C, Spine<B>>
where C: Circuit, B: Batch,

Source

pub fn delay_trace(&self) -> Stream<C, SpineSnapshot<B>>

Source§

impl<C, K> Stream<C, Box<DynPairs<K, DynOpt<DynUnit>>>>
where K: DataTrait + ?Sized, C: Circuit,

Source

pub fn update_set<B>( &self, persistent_id: Option<&str>, factories: &UpdateSetFactories<<C as WithClock>::Time, B>, ) -> Stream<C, B>
where B: ZSet<Key = K>,

Convert a stream of inserts and deletes into a stream of Z-set updates.

The input stream carries changes to a set in the form of insert and delete commands. The set semantics implies that inserting an element that already exists in the set is a no-op. Likewise, deleting an element that is not in the set is a no-op. This operator converts these commands into batches of updates to a Z-set, which is the input format of most DBSP operators.

The operator assumes that the input vector is sorted by key.

This is a stateful operator that internally maintains the trace of the collection.

Source§

impl<C, K, V> Stream<C, Box<DynPairs<K, DynOpt<V>>>>
where K: DataTrait + ?Sized, V: DataTrait + ?Sized, C: Circuit,

Source

pub fn upsert<B>( &self, persistent_id: Option<&str>, factories: &UpsertFactories<<C as WithClock>::Time, B>, ) -> Stream<C, B>
where B: IndexedZSet<Key = K, Val = V>,

Convert a stream of upserts into a stream of updates.

The input stream carries changes to a key/value map in the form of upserts. An upsert assigns a new value to a key (or None to remove the key from the map) without explicitly removing the old value, if any. Upserts are produced by some operators, including Stream::aggregate. The operator converts upserts into batches of updates, which is the input format of most DBSP operators.

The operator assumes that the input vector is sorted by key and contains exactly one value per key.

This is a stateful operator that internally maintains the trace of the collection.

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet, B::InnerBatch: Send,

Source

pub fn lag<VL, PF>( &self, offset: isize, project: PF, ) -> Stream<RootCircuit, TypedBatch<B::Key, Tup2<B::Val, VL>, ZWeight, DynOrdIndexedZSet<B::DynK, DynPair<B::DynV, DynData>>>>
where VL: DBData, PF: Fn(Option<&B::Val>) -> VL + 'static,

Lag operator matches each row in a group with a value at the given offset in the same group.

For each key in the input stream, it matches each associated value with another value in the same group with a smaller (offset > 0) or greater (offset < 0) index according to ascending order of values, applies projection function project to it and outputs the input value along with this projection.

The offset it computed as if each value occurred as many times as its weight. Values with negative weights are ignored; hence the output Z-set will only contain positive weights.

§Arguments
  • offset - offset to the previous value.
  • project - projection function to apply to the delayed row.
Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn lag_custom_order<VL, OV, PF, CF, OF>( &self, offset: isize, project: PF, output: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where VL: DBData, OV: DBData, CF: CmpFunc<V>, PF: Fn(Option<&V>) -> VL + 'static, OF: Fn(&V, &VL) -> OV + 'static,

Like Stream::lag, but uses a custom ordering of values within the group defined by the comparison function CF.

§Arguments
  • offset - offset to the previous or next value.
  • project - projection function to apply to the delayed row.
  • output - output function that constructs the output value from the value of the current row and the projection of the delayed row.
Source

pub fn lag_custom_order_persistent<VL, OV, PF, CF, OF>( &self, persistent_id: Option<&str>, offset: isize, project: PF, output: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where VL: DBData, OV: DBData, CF: CmpFunc<V>, PF: Fn(Option<&V>) -> VL + 'static, OF: Fn(&V, &VL) -> OV + 'static,

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet<DynK = DynData, DynV = DynData>, B::InnerBatch: Send,

Source

pub fn topk_asc( &self, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>

Pick k smallest values in each group.

For each key in the input stream, removes all but k smallest values.

Source

pub fn topk_desc( &self, k: usize, ) -> Stream<RootCircuit, OrdIndexedZSet<B::Key, B::Val>>

Pick k largest values in each group.

For each key in the input stream, removes all but k largest values.

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn topk_custom_order<F>(&self, k: usize) -> Self
where F: CmpFunc<V>,

Pick k smallest values in each group based on a custom comparison function.

This method is similar to topk_asc, but instead of ordering elements according to trait Ord for V, it uses a user-defined comparison function F.

§Correctness
  • CF must establish a total order over V, consistent with impl Eq for V, i.e., CF::cmp(v1, v2) == Equal <=> v1.eq(v2).
Source

pub fn topk_custom_order_persistent<F>( &self, persistent_id: Option<&str>, k: usize, ) -> Self
where F: CmpFunc<V>,

Source

pub fn topk_rank_custom_order<CF, EF, OF, OV>( &self, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, EF: Fn(&V, &V) -> bool + 'static, OF: Fn(i64, &V) -> OV + 'static,

Rank elements in the group and output all elements with rank <= k.

This operator implements the behavior of the following SQL pattern:

SELECT
    ...,
    RANK() OVER (PARTITION BY .. ORDER BY ...) AS rank
FROM table
WHERE rank <= K

The CF type and the rank_eq_func function together establish the ranking of values in the group:

  • CF establishes a total ordering of elements such that v1 < v2 => rank(v1) <= rank(v2).
  • rank_eq_func checks that two elements have equal rank, i.e., have equal values in all the ORDER BY columns in the SQL query above: rank_eq_func(v1, v2) <=> rank(v1) == rank(v2).

The output_func closure takes a value and its rank and produces an output value.

§Correctness
  • CF must establish a total order over V, consistent with impl Eq for V, i.e., CF::cmp(v1, v2) == Equal <=> v1.eq(v2).
  • CF must be consistent with rank_eq_func, i.e., CF::cmp(v1, v2) == Equal => rank_eq_func(v1, v2).
Source

pub fn topk_rank_custom_order_persistent<CF, EF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, EF: Fn(&V, &V) -> bool + 'static, OF: Fn(i64, &V) -> OV + 'static,

Source

pub fn topk_dense_rank_custom_order<CF, EF, OF, OV>( &self, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, EF: Fn(&V, &V) -> bool + 'static, OF: Fn(i64, &V) -> OV + 'static,

Rank elements in the group using dense ranking and output all elements with rank <= k.

Similar to topk_rank_custom_order, but computes a dense ranking of elements in the group.

Source

pub fn topk_dense_rank_custom_order_persistent<CF, EF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, rank_eq_func: EF, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, EF: Fn(&V, &V) -> bool + 'static, OF: Fn(i64, &V) -> OV + 'static,

Source

pub fn topk_row_number_custom_order<CF, OF, OV>( &self, k: usize, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, OF: Fn(i64, &V) -> OV + 'static,

Pick k smallest values in each group based on a custom comparison function. Return the k elements along with their row numbers.

This operator implements the behavior of the following SQL pattern:

SELECT
    ...,
    ROW_NUMBER() OVER (PARTITION BY .. ORDER BY ...) AS row_number
FROM table
WHERE row_number <= K
§Correctness
  • CF must establish a total order over V, consistent with impl Eq for V, i.e., CF::cmp(v1, v2) == Equal <=> v1.eq(v2).
Source

pub fn topk_row_number_custom_order_persistent<CF, OF, OV>( &self, persistent_id: Option<&str>, k: usize, output_func: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where CF: CmpFunc<V>, OV: DBData, OF: Fn(i64, &V) -> OV + 'static,

Source§

impl<C, K1, V1> Stream<C, OrdIndexedZSet<K1, V1>>
where C: Circuit, K1: DBData, V1: DBData,

Source

pub fn outer_join_default<F, V2, O>( &self, other: &Stream<C, OrdIndexedZSet<K1, V2>>, join_func: F, ) -> Stream<C, OrdZSet<O>>
where V2: DBData, O: DBData, F: Fn(&K1, &V1, &V2) -> O + Clone + 'static,

Like Stream::outer_join, but uses default value for the missing side of the join.

Source§

impl<C, I1> Stream<C, I1>
where C: Circuit, I1: IndexedZSet, I1::InnerBatch: Send,

Source

pub fn stream_join<F, I2, V>( &self, other: &Stream<C, I2>, join: F, ) -> Stream<C, OrdZSet<V>>
where I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I2::InnerBatch: Send, V: DBData, F: Fn(&I1::Key, &I1::Val, &I2::Val) -> V + Clone + 'static,

Join two streams of batches.

The operator takes two streams of batches indexed with the same key type (I1::Key = I2::Key) and outputs a stream obtained by joining each pair of inputs.

Input streams will typically be produced by the map_index operator.

§Type arguments
  • F - join function type: maps key and a pair of values from input batches to an output value.
  • I1 - batch type in the first input stream.
  • I2 - batch type in the second input stream.
  • V - output value type.
Source

pub fn stream_join_generic<F, I2, Z>( &self, other: &Stream<C, I2>, join: F, ) -> Stream<C, Z>
where I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I2::InnerBatch: Send, Z: IndexedZSet, F: Fn(&I1::Key, &I1::Val, &I2::Val) -> (Z::Key, Z::Val) + Clone + 'static,

Like Self::stream_join, but can return any batch type.

Source

pub fn monotonic_stream_join<F, I2, Z>( &self, other: &Stream<C, I2>, join: F, ) -> Stream<C, Z>
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I2::InnerBatch: Send, Z: ZSet, F: Fn(&I1::Key, &I1::Val, &I2::Val) -> Z::Key + Clone + 'static,

More efficient than Self::stream_join, but the output of the join function must grow monotonically as (k, v1, v2) tuples are fed to it in lexicographic order.

One such monotonic function is a join function that returns (k, v1, v2) itself.

Source

pub fn stream_antijoin<I2>(&self, other: &Stream<C, I2>) -> Stream<C, I1>
where I1: IndexedZSet, I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I2::Inner: DynFilterMap,

Non-incremental antijoin operator.

Source

pub fn join_generic<I2, F, Z, It>( &self, other: &Stream<C, I2>, join: F, ) -> Stream<C, Z>
where I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I2::InnerBatch: Send, Z: IndexedZSet, Box<Z::DynK>: Clone, Box<Z::DynV>: Clone, F: Fn(&I1::Key, &I1::Val, &I2::Val) -> It + Clone + 'static, It: IntoIterator<Item = (Z::Key, Z::Val)> + 'static,

Source

pub fn outer_join<I2, F, FL, FR, O>( &self, other: &Stream<C, I2>, join_func: F, left_func: FL, right_func: FR, ) -> Stream<C, OrdZSet<O>>
where I2: IndexedZSet<Key = I1::Key, DynK = I1::DynK>, I1::Inner: DynFilterMap, I2::Inner: DynFilterMap, I2::InnerBatch: Send, O: DBData, Box<I1::DynK>: Clone, Box<I1::DynV>: Clone, Box<I2::DynV>: Clone, F: Fn(&I1::Key, &I1::Val, &I2::Val) -> O + Clone + 'static, for<'a> FL: Fn(&I1::Key, &I1::Val) -> O + Clone + 'static, for<'a> FR: Fn(&I2::Key, &I2::Val) -> O + Clone + 'static,

Outer join:

  • returns the output of join_func for common keys.
  • returns the output of left_func for keys only found in self, but not other.
  • returns the output of right_func for keys only found in other, but not self.
Source§

impl<C, I1> Stream<C, I1>
where I1: IndexedZSetReader, I1::Inner: Clone, C: Circuit,

Source

pub fn stream_join_range<I2, RF, JF, It>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, OrdZSet<It::Item>>
where I2: IndexedZSetReader, I2::Inner: Clone, RF: Fn(&I1::Key) -> (I2::Key, I2::Key) + 'static, JF: Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val) -> It + 'static, It: IntoIterator + 'static, It::Item: DBData,

Range-join two streams into an OrdZSet.

This operator is non-incremental, i.e., it joins the pair of batches it receives at each timestamp ignoring previous inputs.

Source

pub fn stream_join_range_index<I2, K, V, RF, JF, It>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, OrdIndexedZSet<K, V>>
where I2: IndexedZSetReader, I2::Inner: Clone, K: DBData, V: DBData, RF: Fn(&I1::Key) -> (I2::Key, I2::Key) + 'static, JF: Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val) -> It + 'static, It: IntoIterator<Item = (K, V)> + 'static,

Range-join two streams into an OrdIndexedZSet.

See module documentation for the definition of the range-join operator and its arguments.

In this version of the operator, the join_func closure returns an iterator over (key, value) pairs used to assemble the output indexed Z-set.

This operator is non-incremental, i.e., it joins the pair of batches it receives at each timestamp ignoring previous inputs.

Source

pub fn stream_join_range_generic<I2, K, V, RF, JF, It, O>( &self, other: &Stream<C, I2>, range_func: RF, join_func: JF, ) -> Stream<C, O>
where I2: IndexedZSetReader, I2::Inner: Clone, K: DBData + Erase<O::DynK>, V: DBData + Erase<O::DynV>, RF: Fn(&I1::Key) -> (I2::Key, I2::Key) + 'static, JF: Fn(&I1::Key, &I1::Val, &I2::Key, &I2::Val) -> It + 'static, It: IntoIterator<Item = (K, V)> + 'static, O: IndexedZSet<Key = K, Val = V>,

Like Self::dyn_stream_join_range, but can return any indexed Z-set type.

Source§

impl<B> Stream<RootCircuit, B>
where B: IndexedZSet,

Source

pub fn neighborhood( &self, neighborhood_descr: &NeighborhoodDescrStream<B::Key, B::Val>, ) -> NeighborhoodStream<B>

Returns a small contiguous range of rows (Neighborhood) of the input table.

This operator helps to visualize the contents of the input table in a UI. The UI client may not have enough throughput/memory to store the entire table, and will instead limit its state to a small range of rows that fit on the screen. We specify such a range, or neighborhood, in terms of its center (or “anchor”), and the number of rows preceding and following the anchor (see NeighborhoodDescr). The user may be interested in a static snapshot of the neighborhood or in a changing view. Both modes are supported by this operator (see the reset argument). The output of the operator is a stream of Neighborhoods.

NOTE: This operator assumes that the integral of the input stream does not contain negative weights (which should normally be the case) and may produce incorrect outputs otherwise.

§Arguments
  • self - a stream of changes to an indexed Z-set.

  • neighborhood_descr - contains the neighborhood descriptor to evaluate at every clock tick. Set to None to disable the operator (it will output an empty neighborhood).

§Output

Outputs a stream of changes to the neighborhood.

The output neighborhood will contain rows with indexes between -descr.before and descr.after - 1. Row 0 is the anchor row, i.e., is the first row in the input stream greater than or equal to descr.anchor. If there is no such row (i.e., all rows in the input stream are smaller than the anchor), then the neighborhood will only contain negative indexes.

The first index in the neighborhood may be greater than -descr.before if the input stream doesn’t contain enough rows preceding the specified anchor. The last index may be smaller than descr.after - 1 if the input stream doesn’t contain descr.after rows following the anchor point.

Source§

impl<C, D> Stream<C, D>
where D: Clone + 'static, C: Circuit,

Source

pub fn delta0_non_iterative<CC>(&self, subcircuit: &CC) -> Stream<CC, D>
where CC: Circuit<Parent = C>,

Import self from the parent circuit to subcircuit via the Delta0NonIterative operator.

Source§

impl<B> Stream<RootCircuit, B>

Source

pub fn stream_sample_keys( &self, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>

Generates a uniform random sample of keys from self.

At every clock tick, computes a random sample of the input batch using BatchReader::sample_keys. The sample_size stream specifies the size of the sample to compute (use 0 when no sample is needed at the current clock cycle to make sure the operator doesn’t waste CPU cycles).

Maximal supported sample size is MAX_SAMPLE_SIZE. If the operator receives a larger sample_size value, it treats it as MAX_SAMPLE_SIZE.

Outputs a Z-set containing randomly sampled keys. Each key is output with weight 1 regardless of its weight or the number of associated values in the input batch.

This is not an incremental operator. It samples the input batch received at the current clock cycle and not the integral of the input stream. Prefix the call to stream_sample_keys() with integrate_trace() to sample the integral of the input.

WARNING: This operator (by definition) returns non-deterministic outputs. As such it may not play well with most other DBSP operators and must be used with care.

Source

pub fn stream_sample_unique_key_vals( &self, sample_size: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>

Generates a uniform random sample of (key,value) pairs from self, assuming that self contains exactly one value per key.

Equivalent to self.map(|(k, v)| (k, v)).stream_sample_keys(), but is more efficient.

Source

pub fn stream_key_quantiles( &self, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<B::Key, (), ZWeight, DynVecZSet<B::DynK>>>

Generates a subset of keys that partition the set of all keys in self into num_quantiles + 1 approximately equal-size quantiles.

Internally, this operator uses the stream_sample_keys operator to compute a uniform random sample of size num_quantiles ^ 2 and then picks every num_quantile’s element of the sample.

Maximal supported num_quantiles value is MAX_QUANTILES. If the operator receives a larger num_quantiles value, it treats it as MAX_QUANTILES.

Outputs a Z-set containing <=num_quantiles keys. Each key is output with weight 1 regardless of its weight or the number of associated values in the input batch.

This is not an incremental operator. It samples the input batch received at the current clock cycle and not the integral of the input stream. Prefix the call to stream_key_quantiles() with integrate_trace() to sample the integral of the input.

WARNING: This operator returns non-deterministic outputs, i.e., feeding the same input twice can produce different outputs. As such it may not play well with most other DBSP operators and must be used with care.

Source

pub fn stream_unique_key_val_quantiles( &self, num_quantiles: &Stream<RootCircuit, usize>, ) -> Stream<RootCircuit, TypedBatch<Tup2<B::Key, B::Val>, (), ZWeight, DynVecZSet<DynPair<B::DynK, B::DynV>>>>

Generates a subset of (key, value) pairs that partition the set of all tuples in self num_quantiles + 1 approximately equal-size quantiles, assuming that self contains exactly one value per key.

Equivalent to self.map(|(k, v)| (k, v)).stream_unique_key_val_quantiles(), but is more efficient.

Source§

impl<C, Pairs> Stream<C, Pairs>
where C: Circuit, Pairs: IndexedZSet, Pairs::InnerBatch: Send,

Source

pub fn semijoin_stream<Keys, Out>( &self, keys: &Stream<C, Keys>, ) -> Stream<C, Out>
where Keys: ZSet<Key = Pairs::Key, DynK = Pairs::DynK>, Keys::InnerBatch: Send, Out: ZSet<Key = (Pairs::Key, Pairs::Val), DynK = DynPair<Pairs::DynK, Pairs::DynV>>,

Semijoin two streams of batches.

The operator takes two streams of batches indexed with the same key type (Pairs::Key = Keys::Key) and outputs a stream obtained by joining each pair of inputs.

Input streams will typically be produced by Stream::map_index().

§Type arguments
  • Pairs - batch type in the first input stream.
  • Keys - batch type in the second input stream.
  • Out - output Z-set type.
Source§

impl<TS, V> Stream<RootCircuit, OrdIndexedZSet<TS, V>>
where TS: DBData + UnsignedPrimInt, V: DBData,

Source

pub fn partitioned_rolling_aggregate_with_waterline<PK, OV, Agg, PF>( &self, waterline: &Stream<RootCircuit, TypedBox<TS, DynDataTyped<TS>>>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
where PK: DBData, OV: DBData, Agg: Aggregator<OV, (), ZWeight>, PF: Fn(&V) -> (PK, OV) + Clone + 'static,

Similar to partitioned_rolling_aggregate, but uses waterline to bound its memory footprint.

Splits the input stream into non-overlapping partitions using partition_func and for each input record computes an aggregate over a relative time range (e.g., the last three months) within its partition. Outputs the contents of the input stream extended with the value of the aggregate.

This operator is incremental and will update previously computed outputs affected by new data. For example, a data point arriving out-of-order may affect previously computed rolling aggregate values at future times.

The waterline stream bounds the out-of-ordedness of the input data by providing a monotonically growing lower bound on timestamps that can appear in the input stream. The operator does not expect inputs with timestamps smaller than the current waterline. The waterline value is used to bound the amount of state maintained by the operator.

§Background

The rolling aggregate operator is typically applied to time series data with bounded out-of-ordedness, i.e, having seen a timestamp ts in the input stream, the operator will never observe a timestamp smaller than ts - b for some bound b. This in turn means that the value of the aggregate will remain constant for timestamps that only depend on times < ts - b. Hence, we do not need to maintain the state needed to recompute these aggregates, which allows us to bound the amount of state maintained by this operator.

The bound ts - b is known as “waterline” and can be computed, e.g., by the waterline_monotonic operator.

§Arguments
  • self - time series data indexed by time.
  • waterline - monotonically growing lower bound on timestamps in the input stream.
  • partition_func - function used to split inputs into non-overlapping partitions indexed by partition key of type PK.
  • aggregator - aggregator used to summarize values within the relative time range range of each input timestamp.
  • range - relative time range to aggregate over.
Source

pub fn partitioned_rolling_aggregate_with_waterline_persistent<PK, OV, Agg, PF>( &self, persistent_id: Option<&str>, waterline: &Stream<RootCircuit, TypedBox<TS, DynDataTyped<TS>>>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
where PK: DBData, OV: DBData, Agg: Aggregator<OV, (), ZWeight>, PF: Fn(&V) -> (PK, OV) + Clone + 'static,

Source

pub fn partitioned_rolling_aggregate<PK, OV, Agg, PF>( &self, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
where Agg: Aggregator<OV, (), ZWeight>, OV: DBData, PK: DBData, PF: Fn(&V) -> (PK, OV) + Clone + 'static,

Rolling aggregate of a partitioned stream over time range.

For each record in the input stream, computes an aggregate over a relative time range (e.g., the last three months). Outputs the contents of the input stream extended with the value of the aggregate.

For each input record (p, (ts, v)), rolling aggregation finds all the records (p, (ts2, x)) such that ts2 is in range(ts), applies aggregator across these records to obtain a finalized value f, and outputs (p, (ts, f)).

This operator is incremental and will update previously computed outputs affected by new data. For example, a data point arriving out-of-order may affect previously computed rolling aggregate value at future times.

Source

pub fn partitioned_rolling_aggregate_persistent<PK, OV, Agg, PF>( &self, persistent_id: Option<&str>, partition_func: PF, aggregator: Agg, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, Agg::Output>
where Agg: Aggregator<OV, (), ZWeight>, OV: DBData, PK: DBData, PF: Fn(&V) -> (PK, OV) + Clone + 'static,

Source

pub fn partitioned_rolling_aggregate_linear<PK, OV, A, O, PF, WF, OF>( &self, partition_func: PF, weigh_func: WF, output_func: OF, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, O>
where PK: DBData, OV: DBData, A: DBWeight + MulByRef<ZWeight, Output = A>, O: DBData, PF: Fn(&V) -> (PK, OV) + Clone + 'static, WF: Fn(&OV) -> A + Clone + 'static, OF: Fn(A) -> O + Clone + 'static,

A version of Self::partitioned_rolling_aggregate optimized for linear aggregation functions. For each input record (p, (ts, v)), it finds all the records (p, (ts2, x)) such that ts2 is in range.range_of(ts), computes the sum s of f(x) across these records, and outputs (p, (ts, Some(output_func(s)))).

Output records from linear aggregation contain an Option type because there might be no records matching range.range_of(ts). If range contains (relative) time 0, this never happens (because the record containing ts itself is always a match), so in that case the caller can safely unwrap() the Option.

In rolling aggregation, the number of output records matches the number of input records.

This method only works for linear aggregation functions f, i.e., functions that satisfy f(a+b) = f(a) + f(b). It will produce incorrect results if f is not linear.

Source

pub fn partitioned_rolling_average<PK, OV, PF>( &self, partition_func: PF, range: RelRange<TS>, ) -> OrdPartitionedOverStream<PK, TS, OV>
where OV: DBWeight + From<ZWeight> + MulByRef<Output = OV> + Div<Output = OV>, PK: DBData, PF: Fn(&V) -> (PK, OV) + Clone + 'static,

Incremental rolling average.

For each input record, it computes the average of the values in records in the same partition in the time range specified by range.

Source§

impl<B> Stream<RootCircuit, B>
where B: BatchReader + 'static, B::Inner: Clone,

Source

pub fn waterline_monotonic<TS, DynTS, IF, WF>( &self, init: IF, waterline_func: WF, ) -> Stream<RootCircuit, TypedBox<TS, DynTS>>
where DynTS: Checkpoint + DataTrait + ?Sized, Box<DynTS>: Clone + SizeOf + NumEntries + Rkyv, TS: DBData + Erase<DynTS>, IF: Fn() -> TS + 'static, WF: Fn(&B::Key) -> TS + 'static,

Compute the waterline of a time series, where the waterline function is monotonic in event time. The notion of time here is distinct from the DBSP logical time and can be modeled using any type that implements Ord.

We use the term “waterline” instead of the more conventional “watermark”, to avoid confusion with watermarks in systems like Flink.

Waterline is an attribute of a time series that indicates the latest timestamp such that no data points with timestamps older than the waterline should appear in the stream. Every record in the time series carries waterline information that can be extracted by applying a user-provided function to it. The waterline of the time series is the maximum of waterlines of all its data points.

This method computes the waterline of a time series assuming that the waterline function is monotonic in the event time, e.g., waterline = event_time - 5s. Such waterlines are the most common in practice and can be computed efficiently by only considering the latest timestamp in each input batch. The method takes a stream of batches indexed by timestamp and outputs a stream of waterlines (scalar values). Its output at each timestamp is a scalar (not a Z-set), computed as the maximum of the previous waterline and the largest waterline in the new input batch.

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn window( &self, inclusive: (bool, bool), bounds: &Stream<RootCircuit, (TypedBox<K, DynData>, TypedBox<K, DynData>)>, ) -> Stream<RootCircuit, OrdIndexedZSet<K, V>>

Extract a subset of values that fall within a moving window from a stream of time-indexed values.

This is a general form of the windowing operator that supports tumbling, rolling windows, watermarks, etc., by relying on a user-supplied function to compute window bounds at each clock cycle.

This operator maintains the window incrementally, i.e., it outputs changes to the contents of the window at each clock cycle. The complete contents of the window can be computed by integrating the output stream.

§Arguments
  • self - stream of indexed Z-sets (indexed by time). The notion of time here is distinct from the DBSP logical time and can be modeled using any type that implements Ord.

  • bounds - stream that contains window bounds to use at each clock cycle. At each clock cycle, it contains a (start_time, end_time) that describes a right-open time range [start_time..end_time), where end_time >= start_time. start_time must grow monotonically, i.e., start_time1 and start_time2 read from the stream at two successive clock cycles must satisfy start_time2 >= start_time1.

§Output

The output stream contains changes to the contents of the window: at every clock cycle it retracts values that belonged to the window at the previous cycle, but no longer do, and inserts new values added to the window. The latter include new values in the input stream that belong to the [start_time..end_time) range and values from earlier inputs that fall within the new range, but not the previous range.

§Circuit
                      bounds

───────────────────────────────────────────────────┐
                                                   │
       ┌────────────────────────────────────────┐  │
       │                                        │  │
       │                                        ▼  ▼
self   │     ┌────────────────┐             ┌───────────┐
───────┴────►│ TraceAppend    ├──┐          │ Window    ├─────►
             └────────────────┘  │          └───────────┘
               ▲                 │                 ▲
               │    ┌──┐         │                 │
               └────┤z1│◄────────┘                 │
                    └┬─┘                           │
                     │            trace            │
                     └─────────────────────────────┘
Source§

impl<C, K, V, R, B> Stream<C, TypedBatch<K, V, R, B>>
where C: Circuit, B: DynBatch<Time = ()>, K: DBData + Erase<B::Key>, V: DBData + Erase<B::Val>, R: DBWeight + Erase<B::R>,

Source

pub fn trace(&self) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>

Record batches in self in a trace.

This operator labels each untimed batch in the stream with the current timestamp and adds it to a trace.

Source

pub fn trace_with_bound<T>( &self, lower_key_bound: TraceBound<B::Key>, lower_val_bound: TraceBound<B::Val>, ) -> Stream<C, TypedBatch<K, V, R, TimedSpine<B, C>>>

Record batches in self in a trace with bounds lower_key_bound and lower_val_bound.

         ┌─────────────┐ trace
self ───►│ TraceAppend ├─────────┐───► output
         └─────────────┘         │
           ▲                     │
           │                     │
           │ local   ┌───────┐   │z1feedback
           └─────────┤Z1Trace│◄──┘
                     └───────┘
Source§

impl<C, B> Stream<C, B>
where C: Circuit, B: Batch<Time = ()>,

Source

pub fn integrate_trace_retain_keys<TS, RK>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_key_func: RK, )
where TS: DBData + Erase<DynData>, RK: Fn(&B::Key, &TS) -> bool + Clone + Send + Sync + 'static,

Like integrate_trace, but additionally applies a retainment policy to keys in the trace.

§Background

Relations that store time series data typically have the property that any new updates can only affect records with recent timestamps. Depending on how the relation is used in queries this might mean that, while records with older timestamps still exist in the relation, they cannot affect any future incremental computation and therefore don’t need to be stored.

§Design

We support two mechanism to specify and eventually discard such unused records.

The first mechanism, exposed via the integrate_trace_with_bound method, is only applicable when keys and/or values in the collection are ordered by time. It allows each consumer of the trace to specify a lower bound on the keys and values it is interested in. The effective bound is the minimum of all bounds specified by individual consumers.

The second mechanism, implemented by this method and the integrate_trace_retain_values method, is more general and allows the caller to specify an arbitrary condition on keys and values in the trace respectively. Keys or values that don’t satisfy the condition are eventually reclaimed by the trace. This mechanism is applicable to collections that are not ordered by time. Hence it doesn’t require rearranging the data in time order. Furthermore, it is applicable to collections that contain multiple timestamp column. Such multidimensional timestamps only form a partial order.

Unlike the first mechanism, this mechanism only allows one global condition to be applied to the stream. This bound affects all operators that use the trace of the stream, i.e., call integrate_trace (or trace in the root scope) on it. This includes for instance join, aggregate, and distinct. All such operators will reference the same instance of a trace. Therefore bounds specified by this API must be based on a global analysis of the entire program.

The two mechanisms described above interact in different ways for keys and values. For keys, the lower bound and the retainment condition are independent and can be active at the same time. Internally, they are enforced using different techniques. Lower bounds are enforced at essentially zero cost. The retention condition is more expensive, but more general.

For values, only one of the two mechanisms can be enabled for any given stream. Whenever a retainment condition is specified it supersedes any lower bounds constraints.

§Arguments
  • bounds_stream - This stream carries scalar values (i.e., single records, not Z-sets). The key retainment condition is defined relative to the last value received from this stream. Typically, this value represents the lowest upper bound of all partially ordered timestamps in self or some other stream, computed with the help of the waterline operator and adjusted by some constant offsets, dictated, e.g., by window sizes used in the queries and the maximal out-of-ordedness of data in the input streams.

  • retain_key_func - given the value received from the bounds_stream at the last clock cycle and a key, returns true if the key should be retained in the trace and false if it should be discarded.

§Correctness
  • As discussed above, the retainment policy set using this method applies to all consumers of the trace. An incorrect policy may reclaim keys that are still needed by some of the operators, leading to incorrect results. Computing a correct retainment policy can be a subtle and error prone task, which is probably best left to automatic tools like compilers.

  • The retainment policy set using this method only applies to self, but not any stream derived from it. In particular, if self is re-sharded using the shard operator, then it may be necessary to call integrate_trace_retain_keys on the resulting stream. In general, computing a correct retainment policy requires keep track of

    • Streams that are sharded by construction and hence the shard operator is a no-op for such streams. For instance, the add_input_set and aggregate operators produce sharded streams.
    • Operators that shard their input streams, e.g., join.
  • This method should be invoked at most once for a stream.

  • retain_key_func must be monotone in its first argument: for any timestamp ts1 and key k such that retain_key_func(ts1, k) = false, and for any ts2 >= ts1 it must hold that retain_key_func(ts2, k) = false, i.e., once a key is rejected, it will remain rejected as the bound increases.

Source

pub fn integrate_trace_retain_values<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Similar to integrate_trace_retain_keys, but applies a retainment policy to values in the trace.

Source

pub fn integrate_trace_retain_values_last_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Source

pub fn integrate_trace_retain_values_top_n<TS, RV>( &self, bounds_stream: &Stream<C, TypedBox<TS, DynData>>, retain_value_func: RV, n: usize, )
where TS: DBData + Erase<DynData>, RV: Fn(&B::Val, &TS) -> bool + Clone + Send + Sync + 'static,

Source

pub fn integrate_trace(&self) -> Stream<C, Spine<B>>

Constructs and returns a untimed trace of this stream.

The trace is unbounded, meaning that data will not be discarded because it has a low key or value. Filter functions set with integrate_trace_retain_keys or integrate_trace_retain_values can still discard data.

The result batch is stored durably for fault tolerance.

Source

pub fn integrate_trace_with_bound( &self, lower_key_bound: TraceBound<<B::Inner as DynBatchReader>::Key>, lower_val_bound: TraceBound<<B::Inner as DynBatchReader>::Val>, ) -> Stream<C, Spine<B>>
where Spine<B>: SizeOf,

Constructs and returns a untimed trace of this stream.

Data in the trace with a key less than lower_key_bound or value less than lower_val_bound can be discarded, although these bounds can be lowered later (discarding less data). Filter functions set with integrate_trace_retain_keys or integrate_trace_retain_values can still discard data.

The result batch is stored durably for fault tolerance.

Source§

impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn aggregate<A>( &self, aggregator: A, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A::Output>>
where A: Aggregator<V, (), ZWeight>,

Source

pub fn aggregate_persistent<A>( &self, persistent_id: Option<&str>, aggregator: A, ) -> Stream<RootCircuit, OrdIndexedZSet<K, A::Output>>
where A: Aggregator<V, (), ZWeight>,

Source

pub fn aggregate_linear_postprocess<F, A, OF, OV>( &self, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Source

pub fn aggregate_linear_postprocess_persistent<F, A, OF, OV>( &self, persistent_id: Option<&str>, f: F, of: OF, ) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Source

pub fn join<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> OV + Clone + 'static,

Source

pub fn join_balanced_inner<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> OV + Clone + 'static,

An adaptive version of join that can change its partitioning policy dynamically to avoid skew.

Source

pub fn join_balanced<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> OV + Clone + 'static,

Behaves as join_balanced_inner when adaptive joins are enabled in dev tweaks and as join otherwise.

Source

pub fn left_join<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> OV + Clone + 'static,

Left-join self with other.

§Important

The other stream must not contain any None values. If it does, the result may be incorrect. The type signature uses Option for values for performance reasons, to avoid an extra internal transformation.

Source

pub fn left_join_balanced_inner<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> OV + Clone + 'static,

An adaptive version of left_join that dynamically changes its partitioning policy to avoid skew.

Source

pub fn left_join_balanced<F, V2, OV>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> OV + Clone + 'static,

Behaves as left_join_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join otherwise.

Source

pub fn join_flatmap<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

Source

pub fn join_flatmap_balanced_inner<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

An adaptive version of join_flatmap that dynamically changes its partitioning policy to avoid skew.

Source

pub fn join_flatmap_balanced<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

Behaves as join_flatmap_balanced_inner when adaptive joins are enabled in dev tweaks and as join_flatmap otherwise.

Source

pub fn left_join_flatmap<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

Like left_join, but can produce multiple output values for each (k, v1, v2) tuple.

Source

pub fn left_join_flatmap_balanced_inner<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

An adaptive version of left_join_flatmap that dynamically changes its partitioning policy to avoid skew.

Source

pub fn left_join_flatmap_balanced<F, V2, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

Behaves as left_join_flatmap_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join_flatmap otherwise.

Source

pub fn join_index<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

Source

pub fn join_index_balanced_inner<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

An adaptive version of join_index that dynamically changes its partitioning policy to avoid skew.

Source

pub fn join_index_balanced<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

Behaves as join_index_balanced_inner when adaptive joins are enabled in dev tweaks and as join_index otherwise.

Source

pub fn left_join_index<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

Like left_join_flatmap, but produces an indexed output stream.

Source

pub fn left_join_index_balanced_inner<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

An adaptive version of left_join_index that dynamically changes its partitioning policy to avoid skew.

Source

pub fn left_join_index_balanced<F, V2, OK, OV, It>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K, Option<V2>>>, join: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &Option<V2>) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

Behaves as left_join_index_balanced_inner when adaptive joins are enabled in dev tweaks and as left_join_index otherwise.

Source

pub fn antijoin<K2, V2>( &self, other: &Stream<RootCircuit, OrdIndexedZSet<K2, V2>>, ) -> Self
where K2: DBData, V2: DBData,

Source

pub fn distinct(&self) -> Self

Source

pub fn hash_distinct(&self) -> Self

Source

pub fn waterline<TS, WF, IF, LB>( &self, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
where TS: DBData, IF: Fn() -> TS + 'static, WF: Fn(&K, &V) -> TS + 'static, LB: Fn(&TS, &TS) -> TS + Clone + 'static,

Source

pub fn waterline_persistent<TS, WF, IF, LB>( &self, persistent_id: Option<&str>, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
where TS: DBData, IF: Fn() -> TS + 'static, WF: Fn(&K, &V) -> TS + 'static, LB: Fn(&TS, &TS) -> TS + Clone + 'static,

Source

pub fn filter<F>(&self, filter_func: F) -> Self
where F: Fn((&K, &V)) -> bool + 'static,

Source

pub fn map<F, OK>(&self, map_func: F) -> Stream<RootCircuit, OrdZSet<OK>>
where OK: DBData, F: Fn((&K, &V)) -> OK + Clone + 'static,

Source

pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where OK: DBData, OV: DBData, F: Fn((&K, &V)) -> (OK, OV) + 'static,

Source

pub fn flat_map<F, I>(&self, func: F) -> Stream<RootCircuit, OrdZSet<I::Item>>
where F: FnMut((&K, &V)) -> I + 'static, I: IntoIterator + 'static, I::Item: DBData,

Source

pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where F: FnMut((&K, &V)) -> I + 'static, I: IntoIterator<Item = (OK, OV)> + 'static, OK: DBData, OV: DBData,

Source§

impl<K> Stream<RootCircuit, OrdZSet<K>>
where K: DBData,

Source

pub fn distinct(&self) -> Self

Source

pub fn hash_distinct(&self) -> Self

Source

pub fn waterline<TS, WF, IF, LB>( &self, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
where TS: DBData, IF: Fn() -> TS + 'static, WF: Fn(&K, &()) -> TS + 'static, LB: Fn(&TS, &TS) -> TS + Clone + 'static,

Source

pub fn waterline_persistent<TS, WF, IF, LB>( &self, persistent_id: Option<&str>, init: IF, extract_ts: WF, least_upper_bound: LB, ) -> Stream<RootCircuit, TypedBox<TS, DynData>>
where TS: DBData, IF: Fn() -> TS + 'static, WF: Fn(&K, &()) -> TS + 'static, LB: Fn(&TS, &TS) -> TS + Clone + 'static,

Source

pub fn filter<F>(&self, filter_func: F) -> Self
where F: Fn(&K) -> bool + 'static,

Source

pub fn map<F, OK>(&self, map_func: F) -> Stream<RootCircuit, OrdZSet<OK>>
where OK: DBData, F: Fn(&K) -> OK + Clone + 'static,

Source

pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where OK: DBData, OV: DBData, F: Fn(&K) -> (OK, OV) + 'static,

Source

pub fn flat_map<F, I>(&self, func: F) -> Stream<RootCircuit, OrdZSet<I::Item>>
where F: FnMut(&K) -> I + 'static, I: IntoIterator + 'static, I::Item: DBData,

Source

pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where F: FnMut(&K) -> I + 'static, I: IntoIterator<Item = (OK, OV)> + 'static, OK: DBData, OV: DBData,

Source

pub fn controlled_key_filter_typed<T, E, F, RF>( &self, threshold: &Stream<RootCircuit, T>, filter_func: F, report_func: RF, ) -> (Self, Stream<RootCircuit, OrdZSet<E>>)
where E: DBData, T: DBData, F: Fn(&T, &K) -> bool + 'static, RF: Fn(&T, &K, &(), ZWeight) -> E + 'static,

Source§

impl<K, V> Stream<NestedCircuit, OrdIndexedZSet<K, V>>
where K: DBData, V: DBData,

Source

pub fn aggregate<A>( &self, aggregator: A, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, A::Output>>

Source

pub fn aggregate_persistent<A>( &self, persistent_id: Option<&str>, aggregator: A, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, A::Output>>

Source

pub fn aggregate_linear_postprocess<F, A, OF, OV>( &self, f: F, of: OF, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Source

pub fn aggregate_linear_postprocess_persistent<F, A, OF, OV>( &self, persistent_id: Option<&str>, f: F, of: OF, ) -> Stream<NestedCircuit, OrdIndexedZSet<K, OV>>
where A: DBWeight + MulByRef<ZWeight, Output = A>, OV: DBData, F: Fn(&V) -> A + Clone + 'static, OF: Fn(A) -> OV + Clone + 'static,

Source

pub fn join<F, V2, OV>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> OV + Clone + 'static,

Source

pub fn join_flatmap<F, V2, OV, It>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdZSet<OV>>
where V2: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = OV> + 'static,

Source

pub fn join_index<F, V2, OK, OV, It>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K, V2>>, join: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
where V2: DBData, OK: DBData, OV: DBData, F: Fn(&K, &V, &V2) -> It + Clone + 'static, It: IntoIterator<Item = (OK, OV)> + 'static,

Source

pub fn antijoin<K2, V2>( &self, other: &Stream<NestedCircuit, OrdIndexedZSet<K2, V2>>, ) -> Self
where K2: DBData, V2: DBData,

Source

pub fn distinct(&self) -> Self

Source

pub fn hash_distinct(&self) -> Self

Source

pub fn filter<F>(&self, filter_func: F) -> Self
where F: Fn((&K, &V)) -> bool + 'static,

Source

pub fn map<F, OK>(&self, map_func: F) -> Stream<NestedCircuit, OrdZSet<OK>>
where OK: DBData, F: Fn((&K, &V)) -> OK + Clone + 'static,

Source

pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
where OK: DBData, OV: DBData, F: Fn((&K, &V)) -> (OK, OV) + 'static,

Source

pub fn flat_map<F, I>(&self, func: F) -> Stream<NestedCircuit, OrdZSet<I::Item>>
where F: FnMut((&K, &V)) -> I + 'static, I: IntoIterator + 'static, I::Item: DBData,

Source

pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
where F: FnMut((&K, &V)) -> I + 'static, I: IntoIterator<Item = (OK, OV)> + 'static, OK: DBData, OV: DBData,

Source§

impl<K> Stream<NestedCircuit, OrdZSet<K>>
where K: DBData,

Source

pub fn distinct(&self) -> Self

Source

pub fn hash_distinct(&self) -> Self

Source

pub fn filter<F>(&self, filter_func: F) -> Self
where F: Fn(&K) -> bool + 'static,

Source

pub fn map<F, OK>(&self, map_func: F) -> Stream<NestedCircuit, OrdZSet<OK>>
where OK: DBData, F: Fn(&K) -> OK + Clone + 'static,

Source

pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
where OK: DBData, OV: DBData, F: Fn(&K) -> (OK, OV) + 'static,

Source

pub fn flat_map<F, I>(&self, func: F) -> Stream<NestedCircuit, OrdZSet<I::Item>>
where F: FnMut(&K) -> I + 'static, I: IntoIterator + 'static, I::Item: DBData,

Source

pub fn flat_map_index<F, OK, OV, I>( &self, func: F, ) -> Stream<NestedCircuit, OrdIndexedZSet<OK, OV>>
where F: FnMut(&K) -> I + 'static, I: IntoIterator<Item = (OK, OV)> + 'static, OK: DBData, OV: DBData,

Source§

impl<PK, TS, V> Stream<ChildCircuit<(), ()>, TypedBatch<PK, Tup2<TS, Option<V>>, i64, FallbackIndexedWSet<dyn Data, dyn Pair<dyn DataTyped<Type = TS>, dyn Opt<dyn Data>>, dyn WeightTyped<Type = i64>>>>
where PK: DBData, V: DBData, TS: DBData,

Source

pub fn map_index<F, OK, OV>( &self, map_func: F, ) -> Stream<RootCircuit, OrdIndexedZSet<OK, OV>>
where OK: DBData, OV: DBData, F: Fn((&PK, &Tup2<TS, Option<V>>)) -> (OK, OV) + 'static,

Trait Implementations§

Source§

impl<C, D> Clone for Stream<C, D>
where C: Clone,

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<C, B> RecursiveStreams<C> for Stream<C, B>

Source§

type Feedback = DelayedFeedback<C, B>

Generalizes: DelayedFeedback type to a group of streams; contains a DelayedFeedback instance for each stream in the group.
Source§

type Export = Stream<<C as Circuit>::Parent, Spine<B>>

Represents streams in the group exported to the parent circuit.
Source§

type Output = Stream<<C as Circuit>::Parent, B>

Type of the final result of the recursive computation: computed output streams exported to the parent circuit and consolidated.
Source§

type Factories = DistinctFactories<B, <C as WithClock>::Time>

Source§

fn new(circuit: &C, factories: &Self::Factories) -> (Self::Feedback, Self)

Create a group of recursive streams along with their feedback connectors.
Source§

fn distinct(self, factories: &Self::Factories) -> Self

Apply distinct to all streams in self.
Source§

fn connect(&self, vars: Self::Feedback)

Close feedback loop for all streams in self.
Source§

fn export(self, factories: &Self::Factories) -> Self::Export

Export all streams in self to the parent circuit.
Source§

fn consolidate( exports: Self::Export, factories: &Self::Factories, ) -> Self::Output

Apply Stream::dyn_consolidate to all streams in exports.
Source§

impl<K, V, B, C> RecursiveStreams<C> for Stream<C, TypedBatch<K, V, ZWeight, B>>
where C: Circuit, C::Parent: Circuit, B: Checkpoint + DynIndexedZSet + Send + Sync, K: DBData + Erase<B::Key>, V: DBData + Erase<B::Val>,

Source§

type Inner = Stream<C, B>

Source§

type Output = Stream<<C as Circuit>::Parent, TypedBatch<K, V, i64, B>>

Source§

unsafe fn typed(inner: &Self::Inner) -> Self

Returns a strongly typed version of the streams. Read more
Source§

unsafe fn typed_exports( inner: &<Self::Inner as DynRecursiveStreams<C>>::Output, ) -> Self::Output

Returns a strongly typed version of output streams. Read more
Source§

fn inner(&self) -> Self::Inner

Source§

fn factories() -> <Self::Inner as DynRecursiveStreams<C>>::Factories

Source§

impl<C, D> StreamMetadata for Stream<C, D>
where C: Clone + 'static, D: 'static,

Source§

fn stream_id(&self) -> StreamId

Source§

fn local_node_id(&self) -> NodeId

Source§

fn origin_node_id(&self) -> &GlobalNodeId

Source§

fn clear_consumer_count(&self)

Resets consumer count to 0.
Source§

fn num_consumers(&self) -> usize

Source§

fn register_consumer(&self)

Invoked by the scheduler exactly once for each consumer operator attached to the stream.
Source§

fn consume_token(&self)

Invoked at each step once by each consumer of the stream. When the token count drops to 1, the last consumer can retrieve the value using StreamValue::take. Read more

Auto Trait Implementations§

§

impl<C, D> Freeze for Stream<C, D>
where C: Freeze,

§

impl<C, D> !RefUnwindSafe for Stream<C, D>

§

impl<C, D> !Send for Stream<C, D>

§

impl<C, D> !Sync for Stream<C, D>

§

impl<C, D> Unpin for Stream<C, D>
where C: Unpin,

§

impl<C, D> UnsafeUnpin for Stream<C, D>
where C: UnsafeUnpin,

§

impl<C, D> !UnwindSafe for Stream<C, D>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> AsAny for T
where T: 'static,

Source§

fn as_any(&self) -> &(dyn Any + 'static)

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<F, W, T, D> Deserialize<With<T, W>, D> for F
where W: DeserializeWith<F, T, D>, D: Fallible + ?Sized, F: ?Sized,

Source§

fn deserialize( &self, deserializer: &mut D, ) -> Result<With<T, W>, <D as Fallible>::Error>

Deserializes using the given deserializer
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Gets the layout of the type.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The type for metadata in pointers and references to Self.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The type for metadata in pointers and references to Self.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Data for T
where T: Clone + 'static,

Source§

impl<T> ErasedDestructor for T
where T: 'static,