Skip to main content

Operator

Trait Operator 

Source
pub trait Operator: 'static {
Show 21 methods // Required methods fn name(&self) -> Cow<'static, str>; fn fixedpoint(&self, scope: Scope) -> bool; // Provided methods fn location(&self) -> OperatorLocation { ... } fn init(&mut self, _global_id: &GlobalNodeId) { ... } fn metadata(&self, _meta: &mut OperatorMeta) { ... } fn clock_start(&mut self, _scope: Scope) { ... } fn clock_end(&mut self, _scope: Scope) { ... } fn is_async(&self) -> bool { ... } fn is_input(&self) -> bool { ... } fn ready(&self) -> bool { ... } fn register_ready_callback<F>(&mut self, _cb: F) where F: Fn() + Send + Sync + 'static { ... } fn checkpoint( &mut self, base: &StoragePath, persistent_id: Option<&str>, files: &mut Vec<Arc<dyn FileCommitter>>, ) -> Result<(), Error> { ... } fn restore( &mut self, base: &StoragePath, persistent_id: Option<&str>, ) -> Result<(), Error> { ... } fn clear_state(&mut self) -> Result<(), Error> { ... } fn start_replay(&mut self) -> Result<(), Error> { ... } fn is_replay_complete(&self) -> bool { ... } fn end_replay(&mut self) -> Result<(), Error> { ... } fn start_transaction(&mut self) { ... } fn flush(&mut self) { ... } fn is_flush_complete(&self) -> bool { ... } fn flush_progress(&self) -> Option<Position> { ... }
}
Expand description

Trait that must be implemented by all operators.

Required Methods§

Source

fn name(&self) -> Cow<'static, str>

Human-readable operator name for debugging purposes.

Source

fn fixedpoint(&self, scope: Scope) -> bool

Check if the operator is in a stable state.

This method is invoked as part of checking if the circuit has reached a fixed point state, i.e., a state where the outputs of all operators will remain constant until the end of the current clock epoch (see Circuit::fixedpoint).

It returns true if the operator’s output is guaranteed to remain constant (i.e., all future outputs will be equal to the last output) as long as its inputs remain constant.

The exact semantics depends on the value of the scope argument, which identifies the circuit whose fixed point state is being checked. Scope 0 is the local circuit. The method is invoked with scope=0 at the end of a clock cycle, and should return true if, assuming that it will see inputs identical to the last input during all future clock cycles in the current clock epoch, it will keep producing the same outputs.

Scope 1 represents the parent of the local circuit. The method is invoked with scope=1 at the end of a clock epoch, and should return true if, assuming that it will see a sequence of inputs (aka the input stream) identical to the last epoch during all future epochs, it will keep producing the same output streams.

Scope 2 represents the grandparent of the local circuit. The method is invoked with scope=2 at the end of the parent clock epoch, and checks that the operator’s output will remain stable wrt to the nested input stream (i.e., stream of streams).

And so on.

The check must be precise. False positives (returning true when the output may change in the future) may lead to early termination before the circuit has reached a fixed point (and hence incorrect output). False negatives (returning false in a stable state) is only acceptable for a finite number of clock cycles and will otherwise prevent the fixedpoint computation from converging.

§Warning

Two operators currently violate this requirement: Z1 and Z1Nested. The latter will get phased out soon. The former is work-in-progress. It can be safely used inside nested circuits when carrying changes to collections across iterations of the fixed point computation, but not as part of an integrator circuit (Stream::integrate).

Provided Methods§

Source

fn location(&self) -> OperatorLocation

The location the operator was created at

Source

fn init(&mut self, _global_id: &GlobalNodeId)

Initialize the operator

Source

fn metadata(&self, _meta: &mut OperatorMeta)

Collects metadata about the current operator

Source

fn clock_start(&mut self, _scope: Scope)

Notify the operator about the start of a new clock epoch.

clock_start and clock_end methods support the nested circuit architecture. A nested circuit (or subcircuit) is a node in the parent circuit that contains another circuit. The nested circuit has its own clock. Each parent clock tick starts a new child clock epoch. Each operator gets notified about start and end of a clock epoch in its local circuit and all of its ancestors.

Formally, operators in a nested circuit operate over nested streams, or streams of streams, with each nested clock epoch starting a new stream. Thus the clock_start and clock_end methods signal respectively the start and completion of a nested stream.

§Examples

For example, feeding the following matrix, where rows represent nested streams,

┌       ┐
│1 2    │
│3 4 5 6│
│7 8 9  |
└       ┘

to an operator requires the following sequence of invocations

clock_start(1) // Start outer clock.
clock_start(0) // Start nested clock (first row of the matrix).
eval(1)
eval(2)
clock_end(0)   // End nested clock.
clock_start(0) // Start nested clock (second row).
eval(3)
eval(4)
eval(5)
eval(6)
clock_end(0)   // End nested clock.
clock_start(0) // Start nested clock (third row).
eval(7)
eval(8)
eval(9)
clock_end(0)   // End nested clock.
clock_end(1)   // End outer clock.

Note that the input and output of most operators belong to the same clock domain, i.e., an operator cannot consume a single value and produce a stream, or the other way around. The only exception are ImportOperators that make the contents of a stream in the parent circuit available inside a subcircuit.

An operator can have multiple input streams, all of which belong to the same clock domain and therefore start and end at the same time. Hence clock_start and clock_end apply to all input and output streams of the operator.

§Arguments
  • scope - the scope whose clock is restarting.
Source

fn clock_end(&mut self, _scope: Scope)

Source

fn is_async(&self) -> bool

Returns true if self is an asynchronous operator.

An asynchronous operator may need to wait for external inputs, i.e., inputs from outside the circuit. While a regular synchronous operator is ready to be triggered as soon as all of its input streams contain data, an async operator may require additional inputs that arrive asynchronously with respect to the operation of the circuit (e.g., from an I/O device or via an IPC channel).

We do not allow operators to block, therefore the scheduler must not schedule an async operator until it has all external inputs available. The scheduler checks that the operator is ready to execute using the ready method.

Source

fn is_input(&self) -> bool

Returns true if self is an input operator.

An input operator feeds new data into the circuit. Examples are the Input and Generator operators.

Source

fn ready(&self) -> bool

Returns true if self has received all required external inputs and is ready to run.

This method must always returns true for synchronous operators. For an asynchronous operator, it returns true if the operator has all external inputs available (see is_async documentation). Once the operator is ready, it remains ready within the current clock cycle, thus the scheduler can safely evaluate the operator.

Source

fn register_ready_callback<F>(&mut self, _cb: F)
where F: Fn() + Send + Sync + 'static,

Register callback to be invoked when an asynchronous operator becomes ready.

This method should only be used for asynchronous operators (see documentation for is_async and ready) in order to enable dynamic schedulers to run async operators as they become ready without continuously polling them. The operator need only support this method being called once, to set a single callback.

Once the callback has been registered, the operator will invoke the callback at every clock cycle, when the operator becomes ready. The callback is invoked with at-least-once semantics, meaning that spurious invocations are possible. The scheduler must always check if the operator is ready to run by calling ready and must be prepared to wait if it returns false.

Source

fn checkpoint( &mut self, base: &StoragePath, persistent_id: Option<&str>, files: &mut Vec<Arc<dyn FileCommitter>>, ) -> Result<(), Error>

Instructs the operator to checkpoint its state to persistent storage in directory base. Any files that the operator creates should have persistent_id in their names to keep them unique.

The operator shouldn’t commit the state to stable storage; rather, it should append the files to be committed to files for later commit.

For most operators this method is a no-op.

Fails if the operator is stateful, i.e., expects a checkpoint, by persistent_id is None

Source

fn restore( &mut self, base: &StoragePath, persistent_id: Option<&str>, ) -> Result<(), Error>

Instruct the operator to restore its state from persistent storage in directory base, using persistent_id to find its files.

For most operators this method is a no-op.

Source

fn clear_state(&mut self) -> Result<(), Error>

Clear the operator’s state.

Source

fn start_replay(&mut self) -> Result<(), Error>

Start replaying the operator’s state to the replay stream.

Only defined for operators that support replay.

Source

fn is_replay_complete(&self) -> bool

Check if the operator has finished replaying its state.

Only defined for operators that support replay.

Source

fn end_replay(&mut self) -> Result<(), Error>

Cleanup any state needed for replay and prepare the operator for normal operation.

Only defined for operators that support replay.

Source

fn start_transaction(&mut self)

Notify the operator about start of a transaction.

The operator can initialize any state needed for the transaction.

Source

fn flush(&mut self)

Notifies the operator that all of its predecessors have produced all outputs for the current transaction.

Operators that wait for all inputs to arrive before producing outputs (e.g., join, aggregate, etc.) can use this notification to start processing inputs the next time eval is invoked.

Source

fn is_flush_complete(&self) -> bool

Invoked after flush after each eval call to check if all outputs have been produced.

Once this method returns true, its downstream operators can be flushed.

Source

fn flush_progress(&self) -> Option<Position>

Returns the current progress of the operator in processing the current transaction.

Returns a best-effort estimate of the amount of work done by the operator toward processing inputs accumulated before flush was called.

Can return None if the operator is not in flush mode (i.e., between flush was called and is_flush_complete returns true).

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<B> Operator for Accumulator<B>
where B: Batch,

Source§

impl<B: Batch> Operator for FilterIndexedZSet<B>

Source§

impl<B: Batch> Operator for FilterZSet<B>

Source§

impl<C, B, T> Operator for Z1Trace<C, B, T>
where C: Circuit, B: Batch, T: Trace,

Source§

impl<CI, CO> Operator for FlatMapIndexedZSet<CI, CO>
where CI: BatchReader, CO: Batch,

Source§

impl<CI, CO> Operator for FlatMapZSet<CI, CO>
where CI: BatchReader, CO: Batch,

Source§

impl<CI, CO> Operator for MapIndexedZSet<CI, CO>
where CI: BatchReader, CO: Batch,

Source§

impl<CI, CO> Operator for MapZSet<CI, CO>
where CI: BatchReader, CO: Batch,

Source§

impl<CI, CO> Operator for Index<CI, CO>
where CI: 'static, CO: BatchReader + 'static,

Source§

impl<CI, CO, F> Operator for IndexWith<CI, CO, F>
where CI: 'static, CO: BatchReader + 'static, F: 'static,

Source§

impl<D> Operator for Delta0<D>
where D: Data,

Source§

impl<D> Operator for Minus<D>
where D: 'static,

Source§

impl<D> Operator for Plus<D>
where D: 'static,

Source§

impl<D> Operator for Sum<D>
where D: Clone + 'static,

Source§

impl<D, T, L> Operator for ExchangeReceiver<D, T, L>
where D: 'static, T: Send + 'static + Clone, L: 'static,

Source§

impl<D, T, L> Operator for ExchangeSender<D, T, L>
where D: 'static, T: Send + 'static + Clone, L: 'static,

Source§

impl<F> Operator for Apply2<F>
where F: 'static,

Source§

impl<F> Operator for Apply2Owned<F>
where F: 'static,

Source§

impl<F> Operator for Apply3<F>
where F: 'static,

Source§

impl<F, const FP: bool> Operator for Apply<F, FP>
where F: 'static,

Source§

impl<I1, I2, O> Operator for StreamJoinRange<I1, I2, O>

Source§

impl<I1, I2, Z> Operator for Join<I1, I2, Z>

Source§

impl<I1, I2, Z> Operator for MonotonicJoin<I1, I2, Z>

Source§

impl<I, B, T, Z, Clk> Operator for JoinTrace<I, B, T, Z, Clk>
where I: WithSnapshot + 'static, B: ZBatch, T: ZBatchReader, Z: IndexedZSet, Clk: WithClock<Time = T::Time> + 'static,

Source§

impl<IT, OT, F> Operator for Input<IT, OT, F>
where IT: 'static, OT: 'static, F: 'static,

Source§

impl<Pairs, Keys, Out> Operator for SemiJoinStream<Pairs, Keys, Out>
where Pairs: 'static, Keys: 'static, Out: Batch,

Source§

impl<R, T> Operator for CsvSource<R, T>
where R: 'static, T: 'static,

Source§

impl<T> Operator for UntimedTraceAppend<T>
where T: Trace + 'static,

Source§

impl<T> Operator for ConstantGenerator<T>
where T: Data + 'static,

Source§

impl<T> Operator for GeneratorNested<T>
where T: Data,

Source§

impl<T> Operator for TransactionZ1<T>
where T: Checkpoint + SizeOf + NumEntries + Clone + 'static,

Source§

impl<T> Operator for Z1<T>
where T: Checkpoint + SizeOf + NumEntries + Clone + 'static,

Source§

impl<T> Operator for Z1Nested<T>
where T: Eq + SizeOf + NumEntries + Clone + 'static,

Source§

impl<T, B, Clk> Operator for TraceAppend<T, B, Clk>
where T: Trace, B: BatchReader, Clk: 'static,

Source§

impl<T, F> Operator for Generator<T, F>
where T: Data, F: 'static,

Source§

impl<T, F> Operator for Inspect<T, F>
where T: 'static, F: FnMut(&T) + 'static,

Source§

impl<T, F> Operator for TransactionGenerator<T, F>
where T: Data, F: 'static,

Source§

impl<TS, I1, T1, I2, T2, Z> Operator for AsofJoin<TS, I1, T1, I2, T2, Z>

Source§

impl<Z> Operator for Distinct<Z>
where Z: 'static,