dbsp/circuit/operator_traits.rs
1//! Common traits for DBSP operators.
2//!
3//! Operators are the building blocks of DBSP circuits. An operator
4//! consumes one or more input streams and produces an output stream.
5
6#![allow(async_fn_in_trait)]
7
8use feldera_storage::{FileCommitter, StoragePath};
9
10use crate::Error;
11use crate::{
12 circuit::{
13 OwnershipPreference, Scope,
14 metadata::{OperatorLocation, OperatorMeta},
15 },
16 trace::cursor::Position,
17};
18use std::borrow::Cow;
19use std::sync::Arc;
20
21use super::GlobalNodeId;
22
23/// Minimal requirements for values exchanged by operators.
24pub trait Data: Clone + 'static {}
25
26impl<T: Clone + 'static> Data for T {}
27
28/// Trait that must be implemented by all operators.
29pub trait Operator: 'static {
30 /// Human-readable operator name for debugging purposes.
31 fn name(&self) -> Cow<'static, str>;
32
33 /// The location the operator was created at
34 fn location(&self) -> OperatorLocation {
35 None
36 }
37
38 /// Initialize the operator
39 fn init(&mut self, _global_id: &GlobalNodeId) {}
40
41 /// Collects metadata about the current operator
42 fn metadata(&self, _meta: &mut OperatorMeta) {}
43
44 /// Notify the operator about the start of a new clock epoch.
45 ///
46 /// `clock_start` and `clock_end` methods support the nested circuit
47 /// architecture. A nested circuit (or subcircuit) is a node in
48 /// the parent circuit that contains another circuit. The nested circuit
49 /// has its own clock. Each parent clock tick starts a new child clock
50 /// epoch. Each operator gets notified about start and end of a clock
51 /// epoch in its local circuit and all of its ancestors.
52 ///
53 /// Formally, operators in a nested circuit operate over nested streams,
54 /// or streams of streams, with each nested clock epoch starting a new
55 /// stream. Thus the `clock_start` and `clock_end` methods signal
56 /// respectively the start and completion of a nested stream.
57 ///
58 /// # Examples
59 ///
60 /// For example, feeding the following matrix, where rows represent nested
61 /// streams,
62 ///
63 /// ```text
64 /// ┌ ┐
65 /// │1 2 │
66 /// │3 4 5 6│
67 /// │7 8 9 |
68 /// └ ┘
69 /// ```
70 ///
71 /// to an operator requires the following sequence of invocations
72 ///
73 /// ```text
74 /// clock_start(1) // Start outer clock.
75 /// clock_start(0) // Start nested clock (first row of the matrix).
76 /// eval(1)
77 /// eval(2)
78 /// clock_end(0) // End nested clock.
79 /// clock_start(0) // Start nested clock (second row).
80 /// eval(3)
81 /// eval(4)
82 /// eval(5)
83 /// eval(6)
84 /// clock_end(0) // End nested clock.
85 /// clock_start(0) // Start nested clock (third row).
86 /// eval(7)
87 /// eval(8)
88 /// eval(9)
89 /// clock_end(0) // End nested clock.
90 /// clock_end(1) // End outer clock.
91 /// ```
92 ///
93 /// Note that the input and output of most operators belong to the same
94 /// clock domain, i.e., an operator cannot consume a single value and
95 /// produce a stream, or the other way around. The only exception are
96 /// [`ImportOperator`]s that make the contents of a stream in the parent
97 /// circuit available inside a subcircuit.
98 ///
99 /// An operator can have multiple input streams, all of which belong to the
100 /// same clock domain and therefore start and end at the same time. Hence
101 /// `clock_start` and `clock_end` apply to all input and output streams of
102 /// the operator.
103 ///
104 /// # Arguments
105 ///
106 /// * `scope` - the scope whose clock is restarting.
107 fn clock_start(&mut self, _scope: Scope) {}
108 fn clock_end(&mut self, _scope: Scope) {}
109
110 /// Returns `true` if `self` is an asynchronous operator.
111 ///
112 /// An asynchronous operator may need to wait for external inputs, i.e.,
113 /// inputs from outside the circuit. While a regular synchronous
114 /// operator is ready to be triggered as soon as all of its input
115 /// streams contain data, an async operator may require additional
116 /// inputs that arrive asynchronously with respect to the operation of
117 /// the circuit (e.g., from an I/O device or via an IPC channel).
118 ///
119 /// We do not allow operators to block, therefore the scheduler must not
120 /// schedule an async operator until it has all external inputs
121 /// available. The scheduler checks that the operator is ready to
122 /// execute using the [`ready`](`Self::ready`) method.
123 fn is_async(&self) -> bool {
124 false
125 }
126
127 /// Returns `true` if `self` is an input operator.
128 ///
129 /// An input operator feeds new data into the circuit. Examples are
130 /// the `Input` and `Generator` operators.
131 fn is_input(&self) -> bool {
132 false
133 }
134
135 /// Returns `true` if `self` has received all required external inputs and
136 /// is ready to run.
137 ///
138 /// This method must always returns `true` for synchronous operators. For
139 /// an asynchronous operator, it returns `true` if the operator has all
140 /// external inputs available (see [`is_async`](`Self::is_async`)
141 /// documentation). Once the operator is ready, it remains ready within
142 /// the current clock cycle, thus the scheduler can safely evaluate the
143 /// operator.
144 fn ready(&self) -> bool {
145 true
146 }
147
148 /// Register callback to be invoked when an asynchronous operator becomes
149 /// ready.
150 ///
151 /// This method should only be used for asynchronous operators (see
152 /// documentation for [`is_async`](`Self::is_async`) and
153 /// [`ready`](`Self::ready`)) in order to enable dynamic schedulers to
154 /// run async operators as they become ready without continuously
155 /// polling them. The operator need only support this method being
156 /// called once, to set a single callback.
157 ///
158 /// Once the callback has been registered, the operator will invoke the
159 /// callback at every clock cycle, when the operator becomes ready.
160 /// The callback is invoked with at-least-once semantics, meaning that
161 /// spurious invocations are possible. The scheduler must always check
162 /// if the operator is ready to run by calling [`ready`](`Self::ready`)
163 /// and must be prepared to wait if it returns `false`.
164 fn register_ready_callback<F>(&mut self, _cb: F)
165 where
166 F: Fn() + Send + Sync + 'static,
167 {
168 }
169
170 /// Check if the operator is in a stable state.
171 ///
172 /// This method is invoked as part of checking if the circuit has reached a
173 /// fixed point state, i.e., a state where the outputs of all operators will
174 /// remain constant until the end of the current clock epoch
175 /// (see [`Circuit::fixedpoint`](`crate::circuit::Circuit::fixedpoint`)).
176 ///
177 /// It returns `true` if the operator's output is guaranteed to remain
178 /// constant (i.e., all future outputs will be equal to the last output) as
179 /// long as its inputs remain constant.
180 ///
181 /// The exact semantics depends on the value of the `scope` argument, which
182 /// identifies the circuit whose fixed point state is being checked.
183 /// Scope 0 is the local circuit. The method is invoked with `scope=0`
184 /// at the end of a clock cycle, and should return `true` if, assuming that
185 /// it will see inputs identical to the last input during all future clock
186 /// cycles in the current clock epoch, it will keep producing the same
187 /// outputs.
188 ///
189 /// Scope 1 represents the parent of the local circuit. The method is
190 /// invoked with `scope=1` at the end of a clock _epoch_, and should
191 /// return `true` if, assuming that it will see a sequence of inputs
192 /// (aka the input stream) identical to the last epoch during all future
193 /// epochs, it will keep producing the same output streams.
194 ///
195 /// Scope 2 represents the grandparent of the local circuit. The method is
196 /// invoked with `scope=2` at the end of the parent clock _epoch_, and
197 /// checks that the operator's output will remain stable wrt to the
198 /// nested input stream (i.e., stream of streams).
199 ///
200 /// And so on.
201 ///
202 /// The check must be precise. False positives (returning `true` when the
203 /// output may change in the future) may lead to early termination before
204 /// the circuit has reached a fixed point (and hence incorrect output).
205 /// False negatives (returning `false` in a stable state) is only acceptable
206 /// for a finite number of clock cycles and will otherwise prevent the
207 /// fixedpoint computation from converging.
208 ///
209 /// # Warning
210 ///
211 /// Two operators currently violate this requirement:
212 /// [`Z1`](`crate::operator::Z1`) and
213 /// [`Z1Nested`](`crate::operator::Z1Nested`). The latter will get phased
214 /// out soon. The former is work-in-progress. It can be safely used inside
215 /// nested circuits when carrying changes to collections across iterations
216 /// of the fixed point computation, but not as part of an integrator circuit
217 /// ([`Stream::integrate`](`crate::circuit::Stream::integrate`)).
218 fn fixedpoint(&self, scope: Scope) -> bool;
219
220 /// Instructs the operator to checkpoint its state to persistent storage in
221 /// directory `base`. Any files that the operator creates should have
222 /// `persistent_id` in their names to keep them unique.
223 ///
224 /// The operator shouldn't commit the state to stable storage; rather, it
225 /// should append the files to be committed to `files` for later commit.
226 ///
227 /// For most operators this method is a no-op.
228 ///
229 /// Fails if the operator is stateful, i.e., expects a checkpoint, by
230 /// `persistent_id` is `None`
231 #[allow(unused_variables)]
232 fn checkpoint(
233 &mut self,
234 base: &StoragePath,
235 persistent_id: Option<&str>,
236 files: &mut Vec<Arc<dyn FileCommitter>>,
237 ) -> Result<(), Error> {
238 Ok(())
239 }
240
241 /// Instruct the operator to restore its state from persistent storage in
242 /// directory `base`, using `persistent_id` to find its files.
243 ///
244 /// For most operators this method is a no-op.
245 #[allow(unused_variables)]
246 fn restore(&mut self, base: &StoragePath, persistent_id: Option<&str>) -> Result<(), Error> {
247 Ok(())
248 }
249
250 /// Clear the operator's state.
251 fn clear_state(&mut self) -> Result<(), Error> {
252 Ok(())
253 }
254
255 /// Start replaying the operator's state to the replay stream.
256 ///
257 /// Only defined for operators that support replay.
258 fn start_replay(&mut self) -> Result<(), Error> {
259 panic!("start_replay() is not implemented for this operator")
260 }
261
262 /// Check if the operator has finished replaying its state.
263 ///
264 /// Only defined for operators that support replay.
265 fn is_replay_complete(&self) -> bool {
266 panic!("is_replay_complete() is not implemented for this operator")
267 }
268
269 /// Cleanup any state needed for replay and prepare the operator for normal operation.
270 ///
271 /// Only defined for operators that support replay.
272 fn end_replay(&mut self) -> Result<(), Error> {
273 panic!("end_replay() is not implemented for this operator")
274 }
275
276 /// Notify the operator about start of a transaction.
277 ///
278 /// The operator can initialize any state needed for the transaction.
279 fn start_transaction(&mut self) {}
280
281 /// Notifies the operator that all of its predecessors have produced
282 /// all outputs for the current transaction.
283 ///
284 /// Operators that wait for all inputs to arrive before producing
285 /// outputs (e.g., join, aggregate, etc.) can use this notification to
286 /// start processing inputs the next time `eval` is invoked.
287 fn flush(&mut self) {}
288
289 /// Invoked after `flush` after each `eval` call to check if all outputs
290 /// have been produced.
291 ///
292 /// Once this method returns `true`, its downstream operators can be flushed.
293 fn is_flush_complete(&self) -> bool {
294 true
295 }
296
297 /// Returns the current progress of the operator in processing the current transaction.
298 ///
299 /// Returns a best-effort estimate of the amount of work done by the operator
300 /// toward processing inputs accumulated before `flush` was called.
301 ///
302 /// Can return `None` if the operator is not in flush mode (i.e., between
303 /// `flush` was called and `is_flush_complete` returns `true`).
304 fn flush_progress(&self) -> Option<Position> {
305 None
306 }
307}
308
309/// A source operator that injects data from the outside world or from the
310/// parent circuit into the local circuit. Consumes no input streams and emits
311/// a single output stream.
312pub trait SourceOperator<O>: Operator {
313 /// Yield the next value.
314 async fn eval(&mut self) -> O;
315}
316
317/// A sink operator consumes an input stream, but does not produce an output
318/// stream. Such operators are used to send results of the computation
319/// performed by the circuit to the outside world.
320pub trait SinkOperator<I>: Operator {
321 /// Consume input by reference.
322 async fn eval(&mut self, input: &I);
323
324 /// Consume input by value.
325 async fn eval_owned(&mut self, input: I) {
326 self.eval(&input).await
327 }
328
329 /// Ownership preference on the operator's input stream
330 /// (see [`OwnershipPreference`]).
331 fn input_preference(&self) -> OwnershipPreference {
332 OwnershipPreference::INDIFFERENT
333 }
334}
335
336/// A sink operator that consumes two input streams, but does not produce
337/// an output stream. Such operators are used to send results of the
338/// computation performed by the circuit to the outside world.
339pub trait BinarySinkOperator<I1, I2>: Operator
340where
341 I1: Clone,
342 I2: Clone,
343{
344 /// Consume inputs.
345 ///
346 /// The operator must be prepared to handle any combination of
347 /// owned and borrowed inputs.
348 async fn eval<'a>(&mut self, lhs: Cow<'a, I1>, rhs: Cow<'a, I2>);
349
350 /// Ownership preference on the operator's input streams
351 /// (see [`OwnershipPreference`]).
352 fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
353 (
354 OwnershipPreference::INDIFFERENT,
355 OwnershipPreference::INDIFFERENT,
356 )
357 }
358}
359
360/// A sink operator that consumes three input streams, but does not produce
361/// an output stream. Such operators are used to send results of the
362/// computation performed by the circuit to the outside world.
363pub trait TernarySinkOperator<I1, I2, I3>: Operator
364where
365 I1: Clone,
366 I2: Clone,
367 I3: Clone,
368{
369 /// Consume inputs.
370 ///
371 /// The operator must be prepared to handle any combination of
372 /// owned and borrowed inputs.
373 async fn eval<'a>(&mut self, input1: Cow<'a, I1>, input2: Cow<'a, I2>, input3: Cow<'a, I3>);
374
375 /// Ownership preference on the operator's input streams
376 /// (see [`OwnershipPreference`]).
377 fn input_preference(
378 &self,
379 ) -> (
380 OwnershipPreference,
381 OwnershipPreference,
382 OwnershipPreference,
383 ) {
384 (
385 OwnershipPreference::INDIFFERENT,
386 OwnershipPreference::INDIFFERENT,
387 OwnershipPreference::INDIFFERENT,
388 )
389 }
390}
391
392/// A unary operator that consumes a stream of inputs of type `I`
393/// and produces a stream of outputs of type `O`.
394pub trait UnaryOperator<I, O>: Operator {
395 /// Consume input by reference.
396 async fn eval(&mut self, input: &I) -> O;
397
398 /// Consume input by value.
399 async fn eval_owned(&mut self, input: I) -> O {
400 self.eval(&input).await
401 }
402
403 /// Ownership preference on the operator's input stream
404 /// (see [`OwnershipPreference`]).
405 fn input_preference(&self) -> OwnershipPreference {
406 OwnershipPreference::INDIFFERENT
407 }
408}
409
410/// A binary operator consumes two input streams carrying values
411/// of types `I1` and `I2` and produces a stream of outputs of type `O`.
412pub trait BinaryOperator<I1, I2, O>: Operator {
413 /// Consume input by reference.
414 async fn eval(&mut self, lhs: &I1, rhs: &I2) -> O;
415
416 /// Consume input by value.
417 async fn eval_owned(&mut self, lhs: I1, rhs: I2) -> O {
418 self.eval(&lhs, &rhs).await
419 }
420
421 /// Consume the first input by value and the second by reference.
422 async fn eval_owned_and_ref(&mut self, lhs: I1, rhs: &I2) -> O {
423 self.eval(&lhs, rhs).await
424 }
425
426 /// Consume the first input by reference and the second by value.
427 async fn eval_ref_and_owned(&mut self, lhs: &I1, rhs: I2) -> O {
428 self.eval(lhs, &rhs).await
429 }
430
431 /// Ownership preference on the operator's input streams
432 /// (see [`OwnershipPreference`]).
433 fn input_preference(&self) -> (OwnershipPreference, OwnershipPreference) {
434 (
435 OwnershipPreference::INDIFFERENT,
436 OwnershipPreference::INDIFFERENT,
437 )
438 }
439}
440
441/// A ternary operator consumes three input streams carrying values
442/// of types `I1`, `I2`, and `I3` and produces a stream of outputs of type `O`.
443pub trait TernaryOperator<I1, I2, I3, O>: Operator
444where
445 I1: Clone,
446 I2: Clone,
447 I3: Clone,
448{
449 /// Consume inputs.
450 ///
451 /// The operator must be prepared to handle any combination of
452 /// owned and borrowed inputs.
453 async fn eval(&mut self, i1: Cow<'_, I1>, i2: Cow<'_, I2>, i3: Cow<'_, I3>) -> O;
454
455 fn input_preference(
456 &self,
457 ) -> (
458 OwnershipPreference,
459 OwnershipPreference,
460 OwnershipPreference,
461 ) {
462 (
463 OwnershipPreference::INDIFFERENT,
464 OwnershipPreference::INDIFFERENT,
465 OwnershipPreference::INDIFFERENT,
466 )
467 }
468}
469
470/// A quaternary operator consumes four input streams carrying values
471/// of types `I1`, `I2`, `I3`, and `I4` and produces a stream of outputs of type
472/// `O`.
473pub trait QuaternaryOperator<I1, I2, I3, I4, O>: Operator
474where
475 I1: Clone,
476 I2: Clone,
477 I3: Clone,
478 I4: Clone,
479{
480 /// Consume inputs.
481 ///
482 /// The operator must be prepared to handle any combination of
483 /// owned and borrowed inputs.
484 async fn eval(
485 &mut self,
486 i1: Cow<'_, I1>,
487 i2: Cow<'_, I2>,
488 i3: Cow<'_, I3>,
489 i4: Cow<'_, I4>,
490 ) -> O;
491
492 fn input_preference(
493 &self,
494 ) -> (
495 OwnershipPreference,
496 OwnershipPreference,
497 OwnershipPreference,
498 OwnershipPreference,
499 ) {
500 (
501 OwnershipPreference::INDIFFERENT,
502 OwnershipPreference::INDIFFERENT,
503 OwnershipPreference::INDIFFERENT,
504 OwnershipPreference::INDIFFERENT,
505 )
506 }
507}
508
509/// An operator that consumes any number of streams carrying values
510/// of type `I` and produces a stream of outputs of type `O`.
511pub trait NaryOperator<I, O>: Operator
512where
513 I: Clone + 'static,
514{
515 /// Consume inputs.
516 ///
517 /// The operator must be prepared to handle any combination of
518 /// owned and borrowed inputs.
519 async fn eval<'a, Iter>(&'a mut self, inputs: Iter) -> O
520 where
521 Iter: Iterator<Item = Cow<'a, I>>;
522
523 /// Ownership preference on the operator's input streams
524 /// (see [`OwnershipPreference`]).
525 fn input_preference(&self) -> OwnershipPreference {
526 OwnershipPreference::INDIFFERENT
527 }
528}
529
530/// A "strict operator" is one whose output only depends on inputs from previous
531/// timestamps and hence can be produced before consuming new inputs. This way
532/// a strict operator can be used as part of a feedback loop where its output is
533/// needed before input for the current timestamp is available.
534///
535/// The only strict operators that DBSP makes available are [Z1] and its variant
536/// [Z1Nested].
537///
538/// [Z1]: crate::operator::Z1
539/// [Z1Nested]: crate::operator::Z1Nested
540/// [Z1Trace]: crate::operator::dynamic::trace::Z1Trace
541pub trait StrictOperator<O>: Operator {
542 /// Returns the output value computed based on data consumed by the operator
543 /// during previous timestamps. This method is invoked **before**
544 /// `eval_strict()` has been invoked for the current timestamp. It can
545 /// be invoked **at most once** for each timestamp,
546 /// as the implementation may mutate or destroy the operator's internal
547 /// state (for example [Z1](`crate::operator::Z1`) returns its inner
548 /// value, leaving the operator empty).
549 fn get_output(&mut self) -> O;
550
551 fn get_final_output(&mut self) -> O;
552}
553
554/// A strict unary operator that consumes a stream of inputs of type `I`
555/// by reference and produces a stream of outputs of type `O`.
556pub trait StrictUnaryOperator<I, O>: StrictOperator<O> {
557 /// Feed input for the current timestamp to the operator by reference. The
558 /// output will be consumed via
559 /// [`get_output`](`StrictOperator::get_output`) during the
560 /// next timestamp.
561 async fn eval_strict(&mut self, input: &I);
562
563 /// Feed input for the current timestamp to the operator by value. The
564 /// output will be consumed via
565 /// [`get_output`](`StrictOperator::get_output`) during the
566 /// next timestamp.
567 async fn eval_strict_owned(&mut self, input: I) {
568 self.eval_strict(&input).await
569 }
570
571 /// Ownership preference on the operator's input stream
572 /// (see [`OwnershipPreference`]).
573 fn input_preference(&self) -> OwnershipPreference {
574 OwnershipPreference::INDIFFERENT
575 }
576}
577
578/// An import operator makes a stream from the parent circuit
579/// available inside a subcircuit.
580///
581/// Import operators are the only kind of operator that span
582/// two clock domains: an import operator reads a single
583/// value from the parent stream per parent clock tick and produces
584/// a stream of outputs in the nested circuit, one for each nested
585/// clock tick.
586///
587/// See [`Delta0`](`crate::operator::Delta0`) for a concrete example
588/// of an import operator.
589pub trait ImportOperator<I, O>: Operator {
590 /// Consumes a value from the parent stream by reference.
591 ///
592 /// Either `import` or [`Self::import_owned`] is invoked once per
593 /// nested clock epoch, right after `clock_start(0)`.
594 fn import(&mut self, val: &I);
595
596 /// Consumes a value from the parent stream by value.
597 fn import_owned(&mut self, val: I);
598
599 /// Invoked once per nested clock cycle to write a value to
600 /// the output stream.
601 async fn eval(&mut self) -> O;
602
603 /// Ownership preference on the operator's input stream
604 /// (see [`OwnershipPreference`]).
605 fn input_preference(&self) -> OwnershipPreference {
606 OwnershipPreference::INDIFFERENT
607 }
608}