Skip to main content

dbsp/circuit/
operator_traits.rs

1//! Common traits for DBSP operators.
2//!
3//! Operators are the building blocks of DBSP circuits.  An operator
4//! consumes one or more input streams and produces an output stream.
5
6#![allow(async_fn_in_trait)]
7
8use feldera_storage::{FileCommitter, StoragePath};
9
10use crate::Error;
11use crate::{
12    circuit::{
13        OwnershipPreference, Scope,
14        metadata::{OperatorLocation, OperatorMeta},
15    },
16    trace::cursor::Position,
17};
18use std::borrow::Cow;
19use std::sync::Arc;
20
21use super::GlobalNodeId;
22
23/// Minimal requirements for values exchanged by operators.
24pub trait Data: Clone + 'static {}
25
26impl<T: Clone + 'static> Data for T {}
27
28/// Trait that must be implemented by all operators.
29pub trait Operator: 'static {
30    /// Human-readable operator name for debugging purposes.
31    fn name(&self) -> Cow<'static, str>;
32
33    /// The location the operator was created at
34    fn location(&self) -> OperatorLocation {
35        None
36    }
37
38    /// Initialize the operator
39    fn init(&mut self, _global_id: &GlobalNodeId) {}
40
41    /// Collects metadata about the current operator
42    fn metadata(&self, _meta: &mut OperatorMeta) {}
43
44    /// Notify the operator about the start of a new clock epoch.
45    ///
46    /// `clock_start` and `clock_end` methods support the nested circuit
47    /// architecture.  A nested circuit (or subcircuit) is a node in
48    /// the parent circuit that contains another circuit.  The nested circuit
49    /// has its own clock.  Each parent clock tick starts a new child clock
50    /// epoch.  Each operator gets notified about start and end of a clock
51    /// epoch in its local circuit and all of its ancestors.
52    ///
53    /// Formally, operators in a nested circuit operate over nested streams,
54    /// or streams of streams, with each nested clock epoch starting a new
55    /// stream.  Thus the `clock_start` and `clock_end` methods signal
56    /// respectively the start and completion of a nested stream.
57    ///
58    /// # Examples
59    ///
60    /// For example, feeding the following matrix, where rows represent nested
61    /// streams,
62    ///
63    /// ```text
64    /// ┌       ┐
65    /// │1 2    │
66    /// │3 4 5 6│
67    /// │7 8 9  |
68    /// └       ┘
69    /// ```
70    ///
71    /// to an operator requires the following sequence of invocations
72    ///
73    /// ```text
74    /// clock_start(1) // Start outer clock.
75    /// clock_start(0) // Start nested clock (first row of the matrix).
76    /// eval(1)
77    /// eval(2)
78    /// clock_end(0)   // End nested clock.
79    /// clock_start(0) // Start nested clock (second row).
80    /// eval(3)
81    /// eval(4)
82    /// eval(5)
83    /// eval(6)
84    /// clock_end(0)   // End nested clock.
85    /// clock_start(0) // Start nested clock (third row).
86    /// eval(7)
87    /// eval(8)
88    /// eval(9)
89    /// clock_end(0)   // End nested clock.
90    /// clock_end(1)   // End outer clock.
91    /// ```
92    ///
93    /// Note that the input and output of most operators belong to the same
94    /// clock domain, i.e., an operator cannot consume a single value and
95    /// produce a stream, or the other way around.  The only exception are
96    /// [`ImportOperator`]s that make the contents of a stream in the parent
97    /// circuit available inside a subcircuit.
98    ///
99    /// An operator can have multiple input streams, all of which belong to the
100    /// same clock domain and therefore start and end at the same time.  Hence
101    /// `clock_start` and `clock_end` apply to all input and output streams of
102    /// the operator.
103    ///
104    /// # Arguments
105    ///
106    /// * `scope` - the scope whose clock is restarting.
107    fn clock_start(&mut self, _scope: Scope) {}
108    fn clock_end(&mut self, _scope: Scope) {}
109
110    /// Returns `true` if `self` is an asynchronous operator.
111    ///
112    /// An asynchronous operator may need to wait for external inputs, i.e.,
113    /// inputs from outside the circuit.  While a regular synchronous
114    /// operator is ready to be triggered as soon as all of its input
115    /// streams contain data, an async operator may require additional
116    /// inputs that arrive asynchronously with respect to the operation of
117    /// the circuit (e.g., from an I/O device or via an IPC channel).
118    ///
119    /// We do not allow operators to block, therefore the scheduler must not
120    /// schedule an async operator until it has all external inputs
121    /// available.  The scheduler checks that the operator is ready to
122    /// execute using the [`ready`](`Self::ready`) method.
123    fn is_async(&self) -> bool {
124        false
125    }
126
127    /// Returns `true` if `self` is an input operator.
128    ///
129    /// An input operator feeds new data into the circuit. Examples are
130    /// the `Input` and `Generator` operators.
131    fn is_input(&self) -> bool {
132        false
133    }
134
135    /// Returns `true` if `self` has received all required external inputs and
136    /// is ready to run.
137    ///
138    /// This method must always returns `true` for synchronous operators.  For
139    /// an asynchronous operator, it returns `true` if the operator has all
140    /// external inputs available (see [`is_async`](`Self::is_async`)
141    /// documentation).  Once the operator is ready, it remains ready within
142    /// the current clock cycle, thus the scheduler can safely evaluate the
143    /// operator.
144    fn ready(&self) -> bool {
145        true
146    }
147
148    /// Register callback to be invoked when an asynchronous operator becomes
149    /// ready.
150    ///
151    /// This method should only be used for asynchronous operators (see
152    /// documentation for [`is_async`](`Self::is_async`) and
153    /// [`ready`](`Self::ready`)) in order to enable dynamic schedulers to
154    /// run async operators as they become ready without continuously
155    /// polling them.  The operator need only support this method being
156    /// called once, to set a single callback.
157    ///
158    /// Once the callback has been registered, the operator will invoke the
159    /// callback at every clock cycle, when the operator becomes ready.
160    /// The callback is invoked with at-least-once semantics, meaning that
161    /// spurious invocations are possible.  The scheduler must always check
162    /// if the operator is ready to run by calling [`ready`](`Self::ready`)
163    /// and must be prepared to wait if it returns `false`.
164    fn register_ready_callback<F>(&mut self, _cb: F)
165    where
166        F: Fn() + Send + Sync + 'static,
167    {
168    }
169
170    /// Check if the operator is in a stable state.
171    ///
172    /// This method is invoked as part of checking if the circuit has reached a
173    /// fixed point state, i.e., a state where the outputs of all operators will
174    /// remain constant until the end of the current clock epoch
175    /// (see [`Circuit::fixedpoint`](`crate::circuit::Circuit::fixedpoint`)).
176    ///
177    /// It returns `true` if the operator's output is guaranteed to remain
178    /// constant (i.e., all future outputs will be equal to the last output) as
179    /// long as its inputs remain constant.
180    ///
181    /// The exact semantics depends on the value of the `scope` argument, which
182    /// identifies the circuit whose fixed point state is being checked.
183    /// Scope 0 is the local circuit.  The method is invoked with `scope=0`
184    /// at the end of a clock cycle, and should return `true` if, assuming that
185    /// it will see inputs identical to the last input during all future clock
186    /// cycles in the current clock epoch, it will keep producing the same
187    /// outputs.
188    ///
189    /// Scope 1 represents the parent of the local circuit.  The method is
190    /// invoked with `scope=1` at the end of a clock _epoch_, and should
191    /// return `true` if, assuming that it will see a sequence of inputs
192    /// (aka the input stream) identical to the last epoch during all future
193    /// epochs, it will keep producing the same output streams.
194    ///
195    /// Scope 2 represents the grandparent of the local circuit.  The method is
196    /// invoked with `scope=2` at the end of the parent clock _epoch_, and
197    /// checks that the operator's output will remain stable wrt to the
198    /// nested input stream (i.e., stream of streams).
199    ///
200    /// And so on.
201    ///
202    /// The check must be precise. False positives (returning `true` when the
203    /// output may change in the future) may lead to early termination before
204    /// the circuit has reached a fixed point (and hence incorrect output).
205    /// False negatives (returning `false` in a stable state) is only acceptable
206    /// for a finite number of clock cycles and will otherwise prevent the
207    /// fixedpoint computation from converging.
208    ///
209    /// # Warning
210    ///
211    /// Two operators currently violate this requirement:
212    /// [`Z1`](`crate::operator::Z1`) and
213    /// [`Z1Nested`](`crate::operator::Z1Nested`). The latter will get phased
214    /// out soon.  The former is work-in-progress. It can be safely used inside
215    /// nested circuits when carrying changes to collections across iterations
216    /// of the fixed point computation, but not as part of an integrator circuit
217    /// ([`Stream::integrate`](`crate::circuit::Stream::integrate`)).
218    fn fixedpoint(&self, scope: Scope) -> bool;
219
220    /// Instructs the operator to checkpoint its state to persistent storage in
221    /// directory `base`. Any files that the operator creates should have
222    /// `persistent_id` in their names to keep them unique.
223    ///
224    /// The operator shouldn't commit the state to stable storage; rather, it
225    /// should append the files to be committed to `files` for later commit.
226    ///
227    /// For most operators this method is a no-op.
228    ///
229    /// Fails if the operator is stateful, i.e., expects a checkpoint, by
230    /// `persistent_id` is `None`
231    #[allow(unused_variables)]
232    fn checkpoint(
233        &mut self,
234        base: &StoragePath,
235        persistent_id: Option<&str>,
236        files: &mut Vec<Arc<dyn FileCommitter>>,
237    ) -> Result<(), Error> {
238        Ok(())
239    }
240
241    /// Instruct the operator to restore its state from persistent storage in
242    /// directory `base`, using `persistent_id` to find its files.
243    ///
244    /// For most operators this method is a no-op.
245    #[allow(unused_variables)]
246    fn restore(&mut self, base: &StoragePath, persistent_id: Option<&str>) -> Result<(), Error> {
247        Ok(())
248    }
249
250    /// Clear the operator's state.
251    fn clear_state(&mut self) -> Result<(), Error> {
252        Ok(())
253    }
254
255    /// Start replaying the operator's state to the replay stream.
256    ///
257    /// Only defined for operators that support replay.
258    fn start_replay(&mut self) -> Result<(), Error> {
259        panic!("start_replay() is not implemented for this operator")
260    }
261
262    /// Check if the operator has finished replaying its state.
263    ///
264    /// Only defined for operators that support replay.
265    fn is_replay_complete(&self) -> bool {
266        panic!("is_replay_complete() is not implemented for this operator")
267    }
268
269    /// Cleanup any state needed for replay and prepare the operator for normal operation.
270    ///
271    /// Only defined for operators that support replay.
272    fn end_replay(&mut self) -> Result<(), Error> {
273        panic!("end_replay() is not implemented for this operator")
274    }
275
276    /// Notify the operator about start of a transaction.
277    ///
278    /// The operator can initialize any state needed for the transaction.
279    fn start_transaction(&mut self) {}
280
281    /// Notifies the operator that all of its predecessors have produced
282    /// all outputs for the current transaction.
283    ///
284    /// Operators that wait for all inputs to arrive before producing
285    /// outputs (e.g., join, aggregate, etc.) can use this notification to
286    /// start processing inputs the next time `eval` is invoked.
287    fn flush(&mut self) {}
288
289    /// Invoked after `flush` after each `eval` call to check if all outputs
290    /// have been produced.
291    ///
292    /// Once this method returns `true`, its downstream operators can be flushed.
293    fn is_flush_complete(&self) -> bool {
294        true
295    }
296
297    /// Returns the current progress of the operator in processing the current transaction.
298    ///
299    /// Returns a best-effort estimate of the amount of work done by the operator
300    /// toward processing inputs accumulated before `flush` was called.
301    ///
302    /// Can return `None` if the operator is not in flush mode (i.e., between
303    /// `flush` was called and `is_flush_complete` returns `true`).
304    fn flush_progress(&self) -> Option<Position> {
305        None
306    }
307}
308
309/// A source operator that injects data from the outside world or from the
310/// parent circuit into the local circuit.  Consumes no input streams and emits
311/// a single output stream.
312pub trait SourceOperator<O>: Operator {
313    /// Yield the next value.
314    async fn eval(&mut self) -> O;
315}
316
317/// A sink operator consumes an input stream, but does not produce an output
318/// stream.  Such operators are used to send results of the computation
319/// performed by the circuit to the outside world.
320pub trait SinkOperator<I>: Operator {
321    /// Consume input by reference.
322    async fn eval(&mut self, input: &I);
323
324    /// Consume input by value.
325    async fn eval_owned(&mut self, input: I) {
326        self.eval(&input).await
327    }
328
329    /// Ownership preference on the operator's input stream
330    /// (see [`OwnershipPreference`]).
331    fn input_preference(&self) -> OwnershipPreference {
332        OwnershipPreference::INDIFFERENT
333    }
334}
335
336/// A sink operator that consumes two input streams, but does not produce
337/// an output stream.  Such operators are used to send results of the
338/// computation performed by the circuit to the outside world.
339pub trait BinarySinkOperator<I1, I2>: Operator
340where
341    I1: Clone,
342    I2: Clone,
343{
344    /// Consume inputs.
345    ///
346    /// The operator must be prepared to handle any combination of
347    /// owned and borrowed inputs.
348    async fn eval<'a>(&mut self, lhs: Cow<'a, I1>, rhs: Cow<'a, I2>);
349
350    /// Ownership preference on the operator's input streams
351    /// (see [`OwnershipPreference`]).
352    fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
353        (
354            OwnershipPreference::INDIFFERENT,
355            OwnershipPreference::INDIFFERENT,
356        )
357    }
358}
359
360/// A sink operator that consumes three input streams, but does not produce
361/// an output stream.  Such operators are used to send results of the
362/// computation performed by the circuit to the outside world.
363pub trait TernarySinkOperator<I1, I2, I3>: Operator
364where
365    I1: Clone,
366    I2: Clone,
367    I3: Clone,
368{
369    /// Consume inputs.
370    ///
371    /// The operator must be prepared to handle any combination of
372    /// owned and borrowed inputs.
373    async fn eval<'a>(&mut self, input1: Cow<'a, I1>, input2: Cow<'a, I2>, input3: Cow<'a, I3>);
374
375    /// Ownership preference on the operator's input streams
376    /// (see [`OwnershipPreference`]).
377    fn input_preference(
378        &self,
379    ) -> (
380        OwnershipPreference,
381        OwnershipPreference,
382        OwnershipPreference,
383    ) {
384        (
385            OwnershipPreference::INDIFFERENT,
386            OwnershipPreference::INDIFFERENT,
387            OwnershipPreference::INDIFFERENT,
388        )
389    }
390}
391
392/// A unary operator that consumes a stream of inputs of type `I`
393/// and produces a stream of outputs of type `O`.
394pub trait UnaryOperator<I, O>: Operator {
395    /// Consume input by reference.
396    async fn eval(&mut self, input: &I) -> O;
397
398    /// Consume input by value.
399    async fn eval_owned(&mut self, input: I) -> O {
400        self.eval(&input).await
401    }
402
403    /// Ownership preference on the operator's input stream
404    /// (see [`OwnershipPreference`]).
405    fn input_preference(&self) -> OwnershipPreference {
406        OwnershipPreference::INDIFFERENT
407    }
408}
409
410/// A binary operator consumes two input streams carrying values
411/// of types `I1` and `I2` and produces a stream of outputs of type `O`.
412pub trait BinaryOperator<I1, I2, O>: Operator {
413    /// Consume input by reference.
414    async fn eval(&mut self, lhs: &I1, rhs: &I2) -> O;
415
416    /// Consume input by value.
417    async fn eval_owned(&mut self, lhs: I1, rhs: I2) -> O {
418        self.eval(&lhs, &rhs).await
419    }
420
421    /// Consume the first input by value and the second by reference.
422    async fn eval_owned_and_ref(&mut self, lhs: I1, rhs: &I2) -> O {
423        self.eval(&lhs, rhs).await
424    }
425
426    /// Consume the first input by reference and the second by value.
427    async fn eval_ref_and_owned(&mut self, lhs: &I1, rhs: I2) -> O {
428        self.eval(lhs, &rhs).await
429    }
430
431    /// Ownership preference on the operator's input streams
432    /// (see [`OwnershipPreference`]).
433    fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
434        (
435            OwnershipPreference::INDIFFERENT,
436            OwnershipPreference::INDIFFERENT,
437        )
438    }
439}
440
441/// A ternary operator consumes three input streams carrying values
442/// of types `I1`, `I2`, and `I3` and produces a stream of outputs of type `O`.
443pub trait TernaryOperator<I1, I2, I3, O>: Operator
444where
445    I1: Clone,
446    I2: Clone,
447    I3: Clone,
448{
449    /// Consume inputs.
450    ///
451    /// The operator must be prepared to handle any combination of
452    /// owned and borrowed inputs.
453    async fn eval(&mut self, i1: Cow<'_, I1>, i2: Cow<'_, I2>, i3: Cow<'_, I3>) -> O;
454
455    fn input_preference(
456        &self,
457    ) -> (
458        OwnershipPreference,
459        OwnershipPreference,
460        OwnershipPreference,
461    ) {
462        (
463            OwnershipPreference::INDIFFERENT,
464            OwnershipPreference::INDIFFERENT,
465            OwnershipPreference::INDIFFERENT,
466        )
467    }
468}
469
470/// A quaternary operator consumes four input streams carrying values
471/// of types `I1`, `I2`, `I3`, and `I4` and produces a stream of outputs of type
472/// `O`.
473pub trait QuaternaryOperator<I1, I2, I3, I4, O>: Operator
474where
475    I1: Clone,
476    I2: Clone,
477    I3: Clone,
478    I4: Clone,
479{
480    /// Consume inputs.
481    ///
482    /// The operator must be prepared to handle any combination of
483    /// owned and borrowed inputs.
484    async fn eval(
485        &mut self,
486        i1: Cow<'_, I1>,
487        i2: Cow<'_, I2>,
488        i3: Cow<'_, I3>,
489        i4: Cow<'_, I4>,
490    ) -> O;
491
492    fn input_preference(
493        &self,
494    ) -> (
495        OwnershipPreference,
496        OwnershipPreference,
497        OwnershipPreference,
498        OwnershipPreference,
499    ) {
500        (
501            OwnershipPreference::INDIFFERENT,
502            OwnershipPreference::INDIFFERENT,
503            OwnershipPreference::INDIFFERENT,
504            OwnershipPreference::INDIFFERENT,
505        )
506    }
507}
508
509/// An operator that consumes any number of streams carrying values
510/// of type `I` and produces a stream of outputs of type `O`.
511pub trait NaryOperator<I, O>: Operator
512where
513    I: Clone + 'static,
514{
515    /// Consume inputs.
516    ///
517    /// The operator must be prepared to handle any combination of
518    /// owned and borrowed inputs.
519    async fn eval<'a, Iter>(&'a mut self, inputs: Iter) -> O
520    where
521        Iter: Iterator<Item = Cow<'a, I>>;
522
523    /// Ownership preference on the operator's input streams
524    /// (see [`OwnershipPreference`]).
525    fn input_preference(&self) -> OwnershipPreference {
526        OwnershipPreference::INDIFFERENT
527    }
528}
529
530/// A "strict operator" is one whose output only depends on inputs from previous
531/// timestamps and hence can be produced before consuming new inputs.  This way
532/// a strict operator can be used as part of a feedback loop where its output is
533/// needed before input for the current timestamp is available.
534///
535/// The only strict operators that DBSP makes available are [Z1] and its variant
536/// [Z1Nested].
537///
538/// [Z1]: crate::operator::Z1
539/// [Z1Nested]: crate::operator::Z1Nested
540/// [Z1Trace]: crate::operator::dynamic::trace::Z1Trace
541pub trait StrictOperator<O>: Operator {
542    /// Returns the output value computed based on data consumed by the operator
543    /// during previous timestamps.  This method is invoked **before**
544    /// `eval_strict()` has been invoked for the current timestamp.  It can
545    /// be invoked **at most once** for each timestamp,
546    /// as the implementation may mutate or destroy the operator's internal
547    /// state (for example [Z1](`crate::operator::Z1`) returns its inner
548    /// value, leaving the operator empty).
549    fn get_output(&mut self) -> O;
550
551    fn get_final_output(&mut self) -> O;
552}
553
554/// A strict unary operator that consumes a stream of inputs of type `I`
555/// by reference and produces a stream of outputs of type `O`.
556pub trait StrictUnaryOperator<I, O>: StrictOperator<O> {
557    /// Feed input for the current timestamp to the operator by reference.  The
558    /// output will be consumed via
559    /// [`get_output`](`StrictOperator::get_output`) during the
560    /// next timestamp.
561    async fn eval_strict(&mut self, input: &I);
562
563    /// Feed input for the current timestamp to the operator by value.  The
564    /// output will be consumed via
565    /// [`get_output`](`StrictOperator::get_output`) during the
566    /// next timestamp.
567    async fn eval_strict_owned(&mut self, input: I) {
568        self.eval_strict(&input).await
569    }
570
571    /// Ownership preference on the operator's input stream
572    /// (see [`OwnershipPreference`]).
573    fn input_preference(&self) -> OwnershipPreference {
574        OwnershipPreference::INDIFFERENT
575    }
576}
577
578/// An import operator makes a stream from the parent circuit
579/// available inside a subcircuit.
580///
581/// Import operators are the only kind of operator that span
582/// two clock domains: an import operator reads a single
583/// value from the parent stream per parent clock tick and produces
584/// a stream of outputs in the nested circuit, one for each nested
585/// clock tick.
586///
587/// See [`Delta0`](`crate::operator::Delta0`) for a concrete example
588/// of an import operator.
589pub trait ImportOperator<I, O>: Operator {
590    /// Consumes a value from the parent stream by reference.
591    ///
592    /// Either `import` or [`Self::import_owned`] is invoked once per
593    /// nested clock epoch, right after `clock_start(0)`.
594    fn import(&mut self, val: &I);
595
596    /// Consumes a value from the parent stream by value.
597    fn import_owned(&mut self, val: I);
598
599    /// Invoked once per nested clock cycle to write a value to
600    /// the output stream.
601    async fn eval(&mut self) -> O;
602
603    /// Ownership preference on the operator's input stream
604    /// (see [`OwnershipPreference`]).
605    fn input_preference(&self) -> OwnershipPreference {
606        OwnershipPreference::INDIFFERENT
607    }
608}