dbsp/circuit/
circuit_builder.rs

1//! API to construct circuits.
2//!
3//! The API exposes two abstractions: "circuits" and "streams", where a circuit
4//! is a possibly nested dataflow graph that consists of operators connected by
5//! streams:
6//!
7//!   * Circuits are represented by the [`Circuit`] trait, which has a single
8//!     implementation [`ChildCircuit<P>`], where `P` is the type of the parent
9//!     circuit.  For a root circuit, `P` is `()`, so the API provides
10//!     [`RootCircuit`] as a synonym for `ChildCircuit<()>`).
11//!
12//!     Use [`RootCircuit::build`] to create a new root circuit.  It takes a
13//!     user-provided callback, which it calls to set up operators and streams
14//!     inside the circuit, and then it returns the circuit. The circuit's
15//!     structure is fixed at construction and can't be changed afterward.
16//!
17//!   * Streams are represented by struct [`Stream<C, D>`], which is a stream
18//!     that carries values of type `D` within a circuit of type `C`.  Methods
19//!     and traits on `Stream` are the main way to assemble the structure of a
20//!     circuit within the [`RootCircuit::build`] callback.
21//!
22//! The API that this directly exposes runs the circuit in the context of the
23//! current thread.  To instead run the circuit in a collection of worker
24//! threads, use [`Runtime::init_circuit`].
25use crate::{
26    Error as DbspError, Position, Runtime,
27    circuit::{
28        cache::{CircuitCache, CircuitStoreMarker},
29        fingerprinter::Fingerprinter,
30        metadata::OperatorMeta,
31        metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
32        operator_traits::{
33            BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
34            QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
35            TernarySinkOperator, UnaryOperator,
36        },
37        runtime::Consensus,
38        schedule::{
39            CommitProgress, DynamicScheduler, Error as SchedulerError, Executor, IterativeExecutor,
40            OnceExecutor, Scheduler,
41        },
42        trace::{CircuitEvent, SchedulerEvent},
43    },
44    circuit_cache_key,
45    ir::LABEL_MIR_NODE_ID,
46    operator::dynamic::balance::{Balancer, BalancerError, BalancerHint, PartitioningPolicy},
47    time::{Timestamp, UnitTimestamp},
48};
49#[cfg(doc)]
50use crate::{
51    InputHandle, OutputHandle,
52    algebra::{IndexedZSet, ZSet},
53    operator::{Aggregator, Fold, Generator, Max, Min, time_series::RelRange},
54    trace::Batch,
55};
56use anyhow::Error as AnyError;
57use dyn_clone::{DynClone, clone_box};
58use feldera_ir::{LirCircuit, LirNodeId};
59use feldera_storage::{FileCommitter, StoragePath};
60use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
61use std::{
62    any::{Any, TypeId, type_name_of_val},
63    borrow::Cow,
64    cell::{Ref, RefCell, RefMut},
65    collections::{BTreeMap, BTreeSet, HashMap},
66    fmt::{self, Debug, Display, Write},
67    future::Future,
68    io::ErrorKind,
69    marker::PhantomData,
70    mem::transmute,
71    ops::Deref,
72    panic::Location,
73    pin::Pin,
74    rc::Rc,
75    sync::Arc,
76    thread::panicking,
77};
78use tokio::{runtime::Runtime as TokioRuntime, task::LocalSet};
79use tracing::debug;
80use typedmap::{TypedMap, TypedMapKey};
81
82use super::dbsp_handle::Mode;
83
84/// Label name used to store operator's persistent id,
85/// i.e., id stable across circuit modifications.
86const LABEL_PERSISTENT_OPERATOR_ID: &str = "persistent_id";
87
88/// Value stored in the stream.
89struct StreamValue<D> {
90    /// Value written to the stream at the current clock cycle;
91    /// `None` after the last consumer has retrieved the value from the stream.
92    val: Option<D>,
93
94    /// The number of consumers connected to the stream.  Each consumer
95    /// reads from the stream exactly once at every clock cycle.
96    ///
97    /// Controlled by the scheduler via `register_consumer` and `clear_consumer_count`.
98    consumers: usize,
99
100    /// The number of remaining consumers still expected to read from the stream
101    /// at the current clock cycle.  This value is reset to `consumers` when
102    /// a new value is written to the stream.  It is decremented on each access.
103    /// The last consumer to read from the stream (`tokens` drops to 0) obtains
104    /// an owned value rather than a borrow.  See description of
105    /// [ownership-aware scheduling](`OwnershipPreference`) for details.
106    tokens: RefCell<usize>,
107}
108
109impl<D> StreamValue<D> {
110    const fn empty() -> Self {
111        Self {
112            val: None,
113            consumers: 0,
114            tokens: RefCell::new(0),
115        }
116    }
117
118    fn put(&mut self, val: D) {
119        // Check that stream contents was consumed at the last clock cycle.
120        // This isn't strictly necessary for correctness, but can indicate a
121        // scheduling or token counting error.
122        debug_assert!(self.val.is_none());
123
124        // If the stream is not connected to any consumers, drop the output
125        // on the floor.
126        if self.consumers > 0 {
127            self.tokens = RefCell::new(self.consumers);
128            self.val = Some(val);
129        }
130    }
131
132    /// Returns a reference to the value.
133    fn peek<R>(this: &R) -> &D
134    where
135        R: Deref<Target = Self>,
136    {
137        debug_assert_ne!(*this.tokens.borrow(), 0);
138
139        this.val.as_ref().unwrap()
140    }
141
142    /// Returns the owned value, leaving `this` empty iff the number of remaining
143    /// tokens is 1; returns None otherwise.
144    fn take(this: &RefCell<Self>) -> Option<D>
145    where
146        D: Clone,
147    {
148        let tokens = *this.borrow().tokens.borrow();
149        debug_assert_ne!(tokens, 0);
150
151        if tokens == 1 {
152            Some(this.borrow_mut().val.take().unwrap())
153        } else {
154            None
155        }
156    }
157
158    /// Must be called exactly once by each consumer of the stream at each clock cycle,
159    /// when the consumer has finished processing the contents of the stream. The consumer
160    /// is not allowed to access the value after calling this function. This guarantees that,
161    /// once the number of tokens drops to 1, only one active consumer remains and that
162    /// consumer can retrieve the value using `Self::take`.
163    fn consume_token(this: &RefCell<Self>) {
164        let this_ref = this.borrow();
165        debug_assert_ne!(*this_ref.tokens.borrow(), 0);
166        *this_ref.tokens.borrow_mut() -= 1;
167        if *this_ref.tokens.borrow() == 0 {
168            // We're the last consumer. It's now safe to take a mutable reference to `this`.
169            drop(this_ref);
170            this.borrow_mut().val.take();
171        }
172    }
173}
174
175#[repr(transparent)]
176pub struct RefStreamValue<D>(Rc<RefCell<StreamValue<D>>>);
177
178impl<D> Clone for RefStreamValue<D> {
179    fn clone(&self) -> Self {
180        Self(self.0.clone())
181    }
182}
183
184impl<D> RefStreamValue<D> {
185    pub fn empty() -> Self {
186        Self(Rc::new(RefCell::new(StreamValue::empty())))
187    }
188
189    fn get_mut(&self) -> RefMut<'_, StreamValue<D>> {
190        self.0.borrow_mut()
191    }
192
193    fn get(&self) -> Ref<'_, StreamValue<D>> {
194        self.0.borrow()
195    }
196
197    /// Put a new value in the stream.
198    ///
199    /// # Panics
200    ///
201    /// Panics if someone is holding a reference to the stream value.
202    pub fn put(&self, d: D) {
203        let mut val = self.get_mut();
204        val.put(d);
205    }
206
207    unsafe fn transmute<D2>(&self) -> RefStreamValue<D2> {
208        unsafe {
209            RefStreamValue(std::mem::transmute::<
210                Rc<RefCell<StreamValue<D>>>,
211                Rc<RefCell<StreamValue<D2>>>,
212            >(self.0.clone()))
213        }
214    }
215}
216
217/// An object-safe interface to a stream.
218///
219/// The `Stream` type is parameterized with circuit and content types, and is not object-safe.
220/// This abstract trait abstracts away those types, allowing to pass streams around as trait objects.
221pub trait StreamMetadata: DynClone + 'static {
222    fn stream_id(&self) -> StreamId;
223    fn local_node_id(&self) -> NodeId;
224    fn origin_node_id(&self) -> &GlobalNodeId;
225    fn num_consumers(&self) -> usize;
226
227    /// Resets consumer count to 0.
228    fn clear_consumer_count(&self);
229
230    /// Invoked by the scheduler exactly once for each consumer operator attached
231    /// to the stream.
232    fn register_consumer(&self);
233}
234
235dyn_clone::clone_trait_object!(StreamMetadata);
236
237/// A `Stream<C, D>` stores the output value of type `D` of an operator in a
238/// circuit with type `C`.
239///
240/// Circuits are synchronous, meaning that each value is produced and consumed
241/// in the same clock cycle, so there can be at most one value in the stream at
242/// any time.
243///
244/// The value type `D` may be any type, although most `Stream` methods impose
245/// additional requirements.  Since a stream must yield one data item per clock
246/// cycle, the rate at which data arrives is important to the choice of type.
247/// If, for example, an otherwise scalar input stream might not have new data on
248/// every cycle, an `Option` type could represent that, and to batch multiple
249/// pieces of data in a single step, one might use [`Vec`] or another collection
250/// type.
251///
252/// In practice, `D` is often a special collection type called an "indexed
253/// Z-set", represented as trait [`IndexedZSet`].  An indexed Z-set is
254/// conceptually a set of `(key, value, weight)` tuples.  Indexed Z-sets have a
255/// specialization called a "non-indexed Z-set" ([`ZSet`]) that contains `key`
256/// and `weight` only.  Indexed and non-indexed Z-sets are both subtraits of a
257/// higher-level [`Batch`] trait.  Many operators on streams work only with an
258/// indexed or non-indexed Z-set or another batch type as `D`.
259///
260/// # Data streams versus delta streams
261///
262/// A value in a `Stream` can represent data, or it can represent a delta (also
263/// called an "update").  Most streams carry data types that could have either
264/// meaning.  In particular, a stream of indexed or non-indexed Z-sets can carry
265/// either:
266///
267///   * In a stream of data, the `weight` indicates the multiplicity of a key. A
268///     negative `weight` has no natural interpretation and might indicate a
269///     bug.
270///
271///   * In a stream of deltas or updates, a positive `weight` represents
272///     insertions and a negative `weight` represents deletions.
273///
274/// There's no way to distinguish a data stream from a delta stream from just
275/// the type of the `Stream` since, as described above, a stream of Z-sets can
276/// be either one.  Some operators make sense for either kind of stream; for
277/// example, adding streams of Z-sets with [`plus`](`Stream::plus`) works
278/// equally well in either case, or even for adding a delta to data.  But other
279/// operations make sense only for streams of data or only for streams of
280/// deltas.  In these cases, `Stream` often provides an operator for each type
281/// of stream, and the programmer must choose the right one, since the types
282/// themselves don't help.
283///
284/// `Stream` refers to operators specifically for streams of data as
285/// "nonincremental".  These operators, which have `stream` in their name,
286/// e.g. `stream_join`, take streams of data as input and produce one as output.
287/// They act as "lifted scalar operators" that don't maintain state across
288/// invocations and only act on their immediate inputs, that is, each output is
289/// produced by independently applying the operator to the individual inputs
290/// received in the current step:
291///
292/// ```text
293///       ┌─────────────┐
294/// a ───►│    non-     │
295///       │ incremental ├───► c
296/// b ───►│  operator   │
297///       └─────────────┘
298/// ```
299///
300/// `Stream` refers to operators meant for streams of deltas as "incremental".
301/// These operators take streams of deltas as input and produces a stream of
302/// deltas as output.  Such operators could be implemented, inefficiently, in
303/// terms of a nonincremental version by putting an integration operator before
304/// each input and a differentiation operator after the output:
305///
306/// ```text
307///        ┌───┐      ┌─────────────┐
308/// Δa ───►│ I ├─────►│             │
309///        └───┘      │    non-     │    ┌───┐
310///                   │ incremental ├───►│ D ├───► Δc
311///        ┌───┐      │  operator   │    └───┘
312/// Δb ───►│ I ├─────►│             │
313///        └───┘      └─────────────┘
314/// ```
315///
316/// # Operator naming convention
317///
318/// `Stream` uses `_index` and `_generic` suffixes and
319/// `stream_` prefix to declare variations of basic operations, e.g., `map`,
320/// `map_index`, `map_generic`, `map_index_generic`, `join`, `stream_join`:
321///
322///   * `stream_` prefix: This prefix indicates that the operator is
323///     "nonincremental", that is, that it works with streams of data, rather
324///     than streams of deltas (see [Data streams versus delta streams], above).
325///
326///     [Data streams versus delta streams]:
327///     Stream#data-streams-versus-delta-streams
328///
329///   * `_generic` suffix: Most operators can assemble their outputs into any
330///     collection type that implements the [`Batch`] trait.  In practice, we
331///     typically use [`OrdIndexedZSet`](`crate::OrdIndexedZSet`) for indexed
332///     batches and [`OrdZSet`](`crate::OrdZSet`) for non-indexed batches.
333///     Methods without the `_generic` suffix return these concrete types,
334///     eliminating the need to type-annotate each invocation, while `_generic`
335///     methods can be used to return arbitrary custom `Batch` implementations.
336///
337///   * `_index` suffix: Methods without the `_index` suffix return non-indexed
338///     batches.  `<method>_index` methods combine the effects of `<method>` and
339///     [`index`](Self::index), e.g., `stream.map_index(closure)` is
340///     functionally equivalent, but more efficient than,
341///     `stream.map(closure).index()`.
342///
343/// # Catalog of stream operators
344///
345/// `Stream` methods are the main way to perform
346/// computations with streams.  The number of available methods can be
347/// overwhelming, so the subsections below categorize them into functionally
348/// related groups.
349///
350/// ## Input operators
351///
352/// Most streams are obtained via methods or traits that operate on `Stream`s.
353/// The input operators create the initial input streams for these other methods
354/// to work with.
355///
356/// [`Circuit::add_source`] is the fundamental way to add an input stream.
357/// Using it directly makes sense for cases like generating input using a
358/// function (perhaps using [`Generator`]) or reading data from a file.  More
359/// commonly, [`RootCircuit`] offers the `add_input_*` functions as convenience
360/// wrappers for `add_source`.  Each one returns a tuple of:
361///
362///   * A `Stream` that can be attached as input to operators in the circuit
363///     (within the constructor function passed to `RootCircuit::build` only).
364///
365///   * An input handle that can be used to add input to the stream from outside
366///     the circuit.  In a typical scenario, the closure passed to build will
367///     return all of its input handles, which are used at runtime to feed new
368///     inputs to the circuit at each step.  Different functions return
369///     different kinds of input handles.
370///
371/// Use [`RootCircuit::add_input_indexed_zset`] or
372/// [`RootCircuit::add_input_zset`] to create an (indexed) Z-set input
373/// stream. There's also [`RootCircuit::add_input_set`] and
374/// [`RootCircuit::add_input_map`] to simplify cases where a regular set or
375/// map is easier to use than a Z-set.  The latter functions maintain an extra
376/// internal table tracking the contents of the set or map, so they're a second
377/// choice.
378///
379/// For special cases, there's also [`RootCircuit::add_input_stream<T>`].  The
380/// [`InputHandle`] that it returns needs to be told which workers to feed the
381/// data to, which makes it harder to use.  It might be useful for feeding
382/// non-relational data to the circuit, such as the current physical time.  DBSP
383/// does not know how to automatically distribute such values across workers,
384/// so the caller must decide whether to send the value to one specific worker
385/// or to broadcast it to everyone.
386///
387/// It's common to pass explicit type arguments to the functions that
388/// create input streams, e.g.:
389///
390/// ```ignore
391/// circuit.add_input_indexed_zset::<KeyType, ValueType, WeightType>()
392/// ```
393///
394/// ## Output and debugging operators
395///
396/// There's not much value in computations whose output can't be seen in the
397/// outside world.  Use [`Stream::output`] to obtain an [`OutputHandle`] that
398/// exports a stream's data.  The constructor function passed to
399/// [`RootCircuit::build`] should return the `OutputHandle` (in addition to all
400/// the input handles as described above).  After each step, the client code
401/// should take the new data from the `OutputHandle`, typically by calling
402/// [`OutputHandle::consolidate`].
403///
404/// Use [`Stream::inspect`] to apply a callback function to each data item in a
405/// stream.  The callback function has no return value and is executed only for
406/// its side effects, such as printing the data item to stdout.  The `inspect`
407/// operator yields the same stream on its output.
408///
409/// It is not an operator, but [`Circuit::region`] can help with debugging by
410/// grouping operators into a named collection.
411///
412/// ## Record-by-record mapping operators
413///
414/// These operators map one kind of batch to another, allowing the client to
415/// pass in a function that looks at individual records in a Z-set or other
416/// batch.  These functions apply to both streams of deltas and streams of data.
417///
418/// The following methods are available for streams of indexed and non-indexed
419/// Z-sets.  Each of these takes a function that accepts a key (for non-indexed
420/// Z-sets) or a key-value pair (for indexed Z-sets):
421///
422///   * Use [`Stream::map`] to output a non-indexed Z-set using an
423///     arbitrary mapping function, or [`Stream::map_index`] to map to
424///     an indexed Z-set.
425///
426///   * Use [`Stream::filter`] to drop records that do not satisfy a
427///     predicate function.  The output stream has the same type as the input.
428///
429///   * Use [`Stream::flat_map`] to output a Z-set that maps each
430///     input record to any number of records, or
431///     [`Stream::flat_map_index`] for indexed Z-set output.  These
432///     methods also work as a `filter_map` equivalent.
433///
434/// ## Value-by-value mapping operators
435///
436/// Sometimes it makes sense to map a stream's whole data item instead of
437/// breaking Z-sets down into records.  Unlike the record-by-record functions,
438/// these functions works with streams that carry a type other than a Z-set or
439/// batch.  These functions apply to both streams of deltas and streams of data.
440///
441/// Use [`Stream::apply`] to apply a mapping function to each item in a given
442/// stream.  [`Stream::apply_named`], [`Stream::apply_owned`], and
443/// [`Stream::apply_owned_named`] offer small variations.
444///
445/// Use [`Stream::apply2`] or [`Stream::apply2_owned`] to apply a mapping
446/// function to pairs of items drawn from two different input streams.
447///
448/// ## Addition and subtraction operators
449///
450/// Arithmetic operators work on Z-sets (and batches) by operating on weights.
451/// For example, adding two Z-sets adds the weights of records with the same
452/// key-value pair.  They also work on streams of arithmetic types.
453///
454/// Use [`Stream::neg`] to map a stream to its negation, that is, to negate the
455/// weights for a Z-set.
456///
457/// Use [`Stream::plus`] to add two streams and [`Stream::sum`] to add an
458/// arbitrary number of streams.  Use [`Stream::minus`] to subtract streams.
459///
460/// There aren't any multiplication or division operators, because there is no
461/// clear interpretation of them for Z-sets.  You can use [`Stream::apply`] and
462/// [`Stream::apply2`], as already described, to do arbitrary arithmetic on one
463/// or two streams of arithmetic types.
464///
465/// ## Stream type conversion operators
466///
467/// These operators convert among streams of deltas, streams of data, and
468/// streams of "upserts".  Client code can use them, but they're more often
469/// useful for testing (for example, for checking that incremental operators are
470/// equivalent to non-incremental ones) or for building other operators.
471///
472/// Use [`Stream::integrate`] to sum up the values within an input stream.  The
473/// first output value is the first input value, the second output value is the
474/// sum of the first two inputs, and so on.  This effectively converts a stream
475/// of deltas into a stream of data.
476///
477/// [`Stream::stream_fold`] generalizes integration.  On each step, it calls a
478/// function to fold the input value with an accumulator and outputs the
479/// accumulator.  The client provides the function and the initial value of the
480/// accumulator.
481///
482/// Use [`Stream::differentiate`] to calculate differences between subsequent
483/// values in an input stream.  The first output is the first input value, the
484/// second output is the second input value minus the first input value, and so
485/// on.  This effectively converts a stream of data into a stream of deltas.
486/// You shouldn't ordinarily need this operator, at least not for Z-sets,
487/// because DBSP operators are fully incremental.
488///
489/// Use [`Stream::upsert`] to convert a stream of "upserts" into a stream of
490/// deltas.  The input stream carries "upserts", or assignments of values to
491/// keys such that a subsequent assignment to a key assigned earlier replaces
492/// the earlier value.  `upsert` turns these into a stream of deltas by
493/// internally tracking upserts that have already been seen.
494///
495/// ## Weighting and counting operators
496///
497/// Use [`Stream::dyn_weigh`] to multiply the weights in an indexed Z-set by a
498/// user-provided function of the key and value.  This is equally appropriate
499/// for streams of data or deltas.  This method outputs a non-indexed Z-set with
500/// just the keys from the input, discarding the values, which also means that
501/// weights will be added together in the case of equal input keys.
502///
503/// `Stream` provides methods to count the number of values per key in an
504/// indexed Z-set:
505///
506///   * To sum the weights for each value within a key, use
507///     [`Stream::dyn_weighted_count`] for streams of deltas or
508///     [`Stream::dyn_stream_weighted_count`] for streams of data.
509///
510///   * To count each value only once even for a weight greater than one, use
511///     [`Stream::dyn_distinct_count`] for streams of deltas or
512///     [`Stream::dyn_stream_distinct_count`] for streams of data.
513///
514/// The "distinct" operator on a Z-set maps positive weights to 1 and all other
515/// weights to 0.  `Stream` has two implementations:
516///
517///   * Use [`Stream::distinct`] to incrementally process a stream of deltas. If
518///     the output stream were to be integrated, it only contain records with
519///     weight 0 or 1.  This operator internally integrates the stream of
520///     deltas, which means its memory consumption is proportional to the
521///     integrated data size.
522///
523///   * Use [`Stream::stream_distinct`] to non-incrementally process a stream of
524///     data.  It sets each record's weight to 1 if it is positive and drops the
525///     others.
526///
527/// ## Join on equal keys
528///
529/// A DBSP equi-join takes batches `a` and `b` as input, finds all pairs of a
530/// record in `a` and a record in `b` with the same key, applies a given
531/// function `F` to those records' key and values, and outputs a Z-set with
532/// `F`'s output.
533///
534/// DBSP implements two kinds of joins:
535///
536///   * Joins of delta streams ("incremental" joins) for indexed Z-sets with the
537///     same key type.  Use [`Stream::join`] for non-indexed Z-set output, or
538///     [`Stream::join_index`] for indexed Z-set output.
539///
540///   * Joins of data streams ("nonincremental" joins), which work with any
541///     indexed batches.  Use [`Stream::stream_join`], which outputs a
542///     non-indexed Z-set.
543///
544///     `stream_join` also works for joining a stream of deltas with an
545///     invariant stream of data where the latter is used as a lookup table.
546///
547///     If the output of the join function grows monotonically as `(k, v1, v2)`
548///     tuples are fed to it in lexicographic order, then
549///     [`Stream::monotonic_stream_join`] is more efficient.  One such monotonic
550///     function is a join function that returns `(k, v1, v2)` itself.
551///
552/// One way to implement a Cartesian product is to map unindexed Z-set inputs
553/// into indexed Z-sets with a unit key type, e.g. `input.index_with(|k| ((),
554/// k))`, and then use `join` or `stream_join`, as appropriate.
555///
556/// ## Other kinds of joins
557///
558/// Use [`Stream::antijoin`] for antijoins of delta streams.  It takes indexed
559/// Z-set `a` and Z-set `b` with the same key type and yields the subset of `a`
560/// whose keys do not appear in `b`.  `b` may be indexed or non-indexed and its
561/// value type does not matter.
562///
563/// Use [`Stream::dyn_semijoin_stream`] for semi-joins of data streams.  It
564/// takes a batch `a` and non-indexed batch `b` with the same key type as `a`.
565/// It outputs a non-indexed Z-set of key-value tuples that contains all the
566/// pairs from `a` for which a key appears in `b`.
567///
568/// Use [`Stream::outer_join`] or [`Stream::outer_join_default`] for outer joins
569/// of delta streams.  The former takes three functions, one for each of the
570/// cases (common keys, left key only, right key only), and the latter
571/// simplifies it by taking only a function for common keys and passing in the
572/// default for the missing value.
573///
574/// DBSP implements "range join" of data streams, which joins keys in `a`
575/// against ranges of keys in `b`.  [`Stream::dyn_stream_join_range`] implements
576/// range join with non-indexed Z-set output,
577/// [`Stream::dyn_stream_join_range_index`] with indexed output.
578///
579/// ## Aggregation
580///
581/// Aggregation applies a function (the "aggregation function") to all of the
582/// values for a given key in an input stream, and outputs an indexed Z-set with
583/// the same keys as the input and the function's output as values.  The output
584/// of aggregation usually has fewer records than its input, because it outputs
585/// only one record per input key, regardless of the number of key-value pairs
586/// with that key.
587///
588/// DBSP implements two kinds of aggregation:
589///
590///   * [`Stream::dyn_aggregate`] aggregates delta streams.  It takes an
591///     aggregation function as an [`Aggregator`], e.g. [`Min`], [`Max`],
592///     [`Fold`], or one written by the client.
593///
594///     [`Stream::dyn_aggregate_linear`] is cheaper for linear aggregation
595///     functions.  It's also a little easier to use with a custom aggregation
596///     function, because it only takes a function rather than an [`Aggregator`]
597///     object.
598///
599///     [`Stream::dyn_average`] calculates the average over the values for each
600///     key.
601///
602///   * [`Stream::dyn_stream_aggregate`] aggregates data streams.  Each batch
603///     from the input is separately aggregated and written to the output
604///     stream.
605///
606///     [`Stream::dyn_stream_aggregate_linear`] applies a linear aggregation
607///     function to a data stream.
608///
609/// These aggregation functions all partition the aggregation by key, like GROUP
610/// BY in SQL.  To aggregate all records in a non-indexed Z-set, map to an
611/// indexed Z-set with a unit key `()` before aggregating, then map again to
612/// remove the index if necessary, e.g.:
613///
614/// ```ignore
615/// let max_auction_count = auction_counts
616///     .map_index(|(_auction, count)| ((), *count))
617///     .aggregate(Max)
618///     .map(|((), max_count)| *max_count);
619/// ```
620///
621/// ## Rolling aggregates
622///
623/// DBSP supports rolling aggregation of time series data over a
624/// client-specified "rolling window" range.  For this purpose, Rust unsigned
625/// integer types model times, larger integers corresponding to later times.
626/// The unit of time in use is relevant only for specifying the width of the
627/// aggregation window, with [`RelRange`].
628///
629/// The DBSP logical time concept is unrelated to times used in rolling
630/// aggregation and other time-series operators. The former is used to establish
631/// the ordering in which updates are consumed by DBSP, while the latter model
632/// physical times when the corresponding events were generated, observed, or
633/// processed. In particular, the ordering of physical and logical timestamps
634/// doesn't have to match. In other words, DBSP can process events out-of-order.
635///
636/// Rolling aggregation takes place within a "partition", which is any
637/// convenient division of the data.  It might correspond to a tenant ID, for
638/// example, if each tenant's data is to be separately aggregated.  To represent
639/// partitioning, rolling aggregation introduces the
640/// [`OrdPartitionedIndexedZSet`](`crate::OrdPartitionedIndexedZSet`)
641/// type, which is an `IndexedZSet` with an arbitrary key type that specifies
642/// the partition (it may be `()` if all data is to be within a single
643/// partition) and a value type of the form `(TS, V)` where `TS` is the type
644/// used for time and `V` is the client's value type.
645///
646/// Rolling aggregation does not reduce the size of data.  It outputs one record
647/// for each input record.
648///
649/// DBSP has two kinds of rolling aggregation functions that differ based on
650/// their tolerance for updating aggregation results when new data arrives for
651/// an old moment in time:
652///
653///   * If the application must tolerate data arriving entirely out-of-order,
654///     use [`Stream::partitioned_rolling_aggregate`].  It operates on a
655///     `PartitionedIndexedZSet` and takes an [`Aggregator`] and a [`RelRange`]
656///     that specifies the span of the window.  It returns another
657///     `PartitionedIndexedZSet` with the results.  This operator must buffer
658///     old data indefinitely since old output is always subject to revision.
659///
660///     [`Stream::partitioned_rolling_aggregate_linear`] is cheaper for linear
661///     aggregation functions.
662///
663///     [`Stream::partitioned_rolling_average`] calculates the rolling average
664///     over a partition.
665///
666///   * If the application can discard data that arrives too out-of-order, use
667///     [`Stream::partitioned_rolling_aggregate_with_waterline`], which can be
668///     more memory-efficient.  This form of rolling aggregation requires a
669///     "waterline" stream, which is a stream of times (scalars, not batches or
670///     Z sets) that reports the earliest time that can be updated.  Use
671///     [`Stream::waterline_monotonic`] to conveniently produce the waterline
672///     stream.
673///
674///     [`Stream::partitioned_rolling_aggregate_with_waterline`] operates on an
675///     `IndexedZSet` and, in addition to the aggregrator, range, and waterline
676///     stream, it takes a function to map a record to a partition. It discards
677///     input before the waterline, partitions it, aggregates it, and returns
678///     the result as a `PartitionedIndexedZSet`.
679///
680/// ## Windowing
681///
682/// Use [`Stream::dyn_window`] to extract a stream of deltas to windows from a
683/// stream of deltas.  This can be useful for windowing outside the context of
684/// rolling aggregation.
685pub struct Stream<C, D> {
686    /// Globally unique ID of the stream.
687    stream_id: StreamId,
688    /// Id of the operator within the local circuit that writes to the stream.
689    local_node_id: NodeId,
690    /// Global id of the node that writes to this stream.
691    origin_node_id: GlobalNodeId,
692    /// Circuit that this stream belongs to.
693    circuit: C,
694    /// Value stored in the stream (there can be at most one since our
695    /// circuits are synchronous).
696    val: RefStreamValue<D>,
697}
698
699impl<C, D> StreamMetadata for Stream<C, D>
700where
701    C: Clone + 'static,
702    D: 'static,
703{
704    fn stream_id(&self) -> StreamId {
705        self.stream_id
706    }
707    fn local_node_id(&self) -> NodeId {
708        self.local_node_id
709    }
710    fn origin_node_id(&self) -> &GlobalNodeId {
711        &self.origin_node_id
712    }
713    fn clear_consumer_count(&self) {
714        self.val.get_mut().consumers = 0;
715    }
716    fn num_consumers(&self) -> usize {
717        self.val.get().consumers
718    }
719    fn register_consumer(&self) {
720        self.val.get_mut().consumers += 1;
721    }
722}
723
724impl<C, D> Clone for Stream<C, D>
725where
726    C: Clone,
727{
728    fn clone(&self) -> Self {
729        Self {
730            stream_id: self.stream_id,
731            local_node_id: self.local_node_id,
732            origin_node_id: self.origin_node_id.clone(),
733            circuit: self.circuit.clone(),
734            val: self.val.clone(),
735        }
736    }
737}
738
739impl<C, D> Stream<C, D>
740where
741    C: Clone,
742{
743    /// Transmute a stream of `D` into a stream of `D2`.
744    ///
745    /// This is unsafe and dangerous for the same reasons [`std::mem:transmute`]
746    /// is dangerous and should be used with care.
747    ///
748    /// # Safety
749    ///
750    /// Transmuting `D` into `D2` should be safe.
751    pub(crate) unsafe fn transmute_payload<D2>(&self) -> Stream<C, D2> {
752        unsafe {
753            Stream {
754                stream_id: self.stream_id,
755                local_node_id: self.local_node_id,
756                origin_node_id: self.origin_node_id.clone(),
757                circuit: self.circuit.clone(),
758                val: self.val.transmute::<D2>(),
759            }
760        }
761    }
762}
763
764impl<C, D> Stream<C, D> {
765    /// Returns local node id of the operator or subcircuit that writes to
766    /// this stream.
767    ///
768    /// If the stream originates in a subcircuit, returns the id of the
769    /// subcircuit node.
770    pub fn local_node_id(&self) -> NodeId {
771        self.local_node_id
772    }
773
774    /// Returns global id of the operator that writes to this stream.
775    ///
776    /// If the stream originates in a subcircuit, returns id of the operator
777    /// inside the subcircuit (or one of its subcircuits) that produces the
778    /// contents of the stream.
779    pub fn origin_node_id(&self) -> &GlobalNodeId {
780        &self.origin_node_id
781    }
782
783    pub fn stream_id(&self) -> StreamId {
784        self.stream_id
785    }
786
787    /// Reference to the circuit the stream belongs to.
788    pub fn circuit(&self) -> &C {
789        &self.circuit
790    }
791
792    pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool {
793        self.stream_id() == other.stream_id()
794    }
795}
796
797// Internal streams API only used inside this module.
798impl<C, D> Stream<C, D>
799where
800    C: Circuit,
801{
802    /// Create a new stream within the given circuit, connected to the specified
803    /// node id.
804    fn new(circuit: C, node_id: NodeId) -> Self {
805        Self {
806            stream_id: circuit.allocate_stream_id(),
807            local_node_id: node_id,
808            origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
809            circuit,
810            val: RefStreamValue::empty(),
811        }
812    }
813
814    /// Create a stream out of an existing [`RefStreamValue`] with `node_id` as the source.
815    pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self {
816        Self {
817            stream_id: circuit.allocate_stream_id(),
818            local_node_id: node_id,
819            origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
820            circuit,
821            val,
822        }
823    }
824
825    pub fn value(&self) -> RefStreamValue<D> {
826        self.val.clone()
827    }
828
829    /// Export stream to the parent circuit.
830    ///
831    /// Creates a stream in the parent circuit that contains the last value in
832    /// `self` when the child circuit terminates.
833    ///
834    /// This method currently only works for streams connected to a feedback
835    /// `Z1` operator and will panic for other streams.
836    pub fn export(&self) -> Stream<C::Parent, D>
837    where
838        C::Parent: Circuit,
839        D: 'static,
840    {
841        self.circuit()
842            .cache_get_or_insert_with(ExportId::new(self.stream_id()), || unimplemented!())
843            .clone()
844    }
845
846    /// Call `set_label` on the node that produces this stream.
847    pub fn set_label(&self, key: &str, val: &str) -> Self {
848        self.circuit.set_node_label(&self.origin_node_id, key, val);
849        self.clone()
850    }
851
852    /// Call `get_label` on the node that produces this stream.
853    pub fn get_label(&self, key: &str) -> Option<String> {
854        self.circuit.get_node_label(&self.origin_node_id, key)
855    }
856
857    /// Set persistent id for the operator that produces this stream.
858    pub fn set_persistent_id(&self, name: Option<&str>) -> Self {
859        if let Some(name) = name {
860            self.set_label(LABEL_PERSISTENT_OPERATOR_ID, name)
861        } else {
862            self.clone()
863        }
864    }
865
866    /// Get persistent id for the operator that produces this stream.
867    pub fn get_persistent_id(&self) -> Option<String> {
868        self.get_label(LABEL_PERSISTENT_OPERATOR_ID)
869    }
870}
871
872impl<C, D> Stream<C, D> {
873    /// Create a stream whose origin differs from its node id.
874    fn with_origin(
875        circuit: C,
876        stream_id: StreamId,
877        node_id: NodeId,
878        origin_node_id: GlobalNodeId,
879    ) -> Self {
880        Self {
881            stream_id,
882            local_node_id: node_id,
883            origin_node_id,
884            circuit,
885            val: RefStreamValue::empty(),
886        }
887    }
888}
889
890impl<C, D> Stream<C, D>
891where
892    D: Clone,
893{
894    fn get(&self) -> Ref<'_, StreamValue<D>> {
895        self.val.get()
896    }
897
898    fn val(&self) -> &RefCell<StreamValue<D>> {
899        &self.val.0
900    }
901
902    /// Puts a value in the stream, overwriting the previous value if any.
903    ///
904    /// # Panics
905    ///
906    /// The caller must have exclusive access to the current stream;
907    /// otherwise the method will panic.
908    fn put(&self, d: D) {
909        self.val.put(d);
910    }
911}
912
913/// Stream whose final value is exported to the parent circuit.
914///
915/// The struct bundles a pair of streams emitted by a
916/// [`StrictOperator`](`crate::circuit::operator_traits::StrictOperator`):
917/// a `local` stream inside operator's local circuit and an
918/// export stream available to the parent of the local circuit.
919/// The export stream contains the final value computed by the
920/// operator before `clock_end`.
921pub struct ExportStream<C, D>
922where
923    C: Circuit,
924{
925    pub local: Stream<C, D>,
926    pub export: Stream<C::Parent, D>,
927}
928
929/// Relative location of a circuit in the hierarchy of nested circuits.
930///
931/// `0` refers to the local circuit that a given node or operator belongs
932/// to, `1` - to the parent of the local, circuit, `2` - to the parent's
933/// parent, etc.
934pub type Scope = u16;
935
936/// Node in a circuit.  A node wraps an operator with strongly typed
937/// input and output streams.
938pub trait Node: Any {
939    /// Node id unique within its parent circuit.
940    fn local_id(&self) -> NodeId;
941
942    /// Global node id.
943    fn global_id(&self) -> &GlobalNodeId;
944
945    /// Persistent node id that remains stable across circuit restarts.  This Id can be used
946    /// to pick up operator state from a checkpoint after restart.
947    ///
948    /// * In ephemeral mode, this id is derived from the global node id. Such an Id can be used
949    ///   to recover the operator after a failure, when the circuit is identical to the one that was
950    ///   running before the failure.
951    ///
952    /// * In persistent mode, this id is derived from the operator's persistent Id assigned to it
953    ///   by the compiler during circuit construction.  This Id will remain stable across circuit
954    ///   restarts even if the circuit changes, as long as all ancestors of the node remain the
955    ///   same. The function will return `None` in persistent mode if the operator
956    ///   does not have a compiler-assigned persistent Id.
957    fn persistent_id(&self) -> Option<String> {
958        let worker_index = Runtime::worker_index();
959
960        match Runtime::mode() {
961            Mode::Ephemeral => Some(format!(
962                "{worker_index}-{}",
963                self.global_id().path_as_string()
964            )),
965            Mode::Persistent => self
966                .get_label(LABEL_PERSISTENT_OPERATOR_ID)
967                .map(|operator_id| format!("{worker_index}-{operator_id}")),
968        }
969    }
970
971    /// Operator name, e.g., "Map", "Join", etc.
972    fn name(&self) -> Cow<'static, str>;
973
974    fn is_circuit(&self) -> bool {
975        false
976    }
977
978    /// Is this an input node?
979    fn is_input(&self) -> bool;
980
981    /// `true` if the node encapsulates an asynchronous operator (see
982    /// [`Operator::is_async()`](super::operator_traits::Operator::is_async)).
983    /// `false` for synchronous operators and subcircuits.
984    fn is_async(&self) -> bool;
985
986    /// `true` if the node is ready to execute (see
987    /// [`Operator::ready()`](super::operator_traits::Operator::ready)).
988    /// Always returns `true` for synchronous operators and subcircuits.
989    fn ready(&self) -> bool;
990
991    /// Register callback to be invoked when an asynchronous operator becomes
992    /// ready (see
993    /// [`super::operator_traits::Operator::register_ready_callback`]).
994    fn register_ready_callback(&mut self, _cb: Box<dyn Fn() + Send + Sync>) {}
995
996    /// Evaluate the operator.  Reads one value from each input stream
997    /// and pushes a new value to the output stream (except for sink
998    /// operators, which don't have an output stream).
999    fn eval<'a>(
1000        &'a mut self,
1001    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>>;
1002
1003    fn import(&mut self) {}
1004
1005    /// Notify the operator that the circuit is starting a transaction.
1006    fn start_transaction(&mut self);
1007
1008    /// Notify the node about start of a transaction.
1009    /// Call `Operator::flush` on the operator.
1010    fn flush(&mut self);
1011
1012    /// Call `Operator::flush_complete` on the operator.
1013    fn is_flush_complete(&self) -> bool;
1014
1015    /// Notify the node about start of a clock epoch.
1016    ///
1017    /// The node should forward the notification to its inner operator.
1018    ///
1019    /// # Arguments
1020    ///
1021    /// * `scope` - the scope whose clock is restarting. A node gets notified
1022    ///   about clock events in its local circuit (scope 0) and all its
1023    ///   ancestors.
1024    fn clock_start(&mut self, scope: Scope);
1025
1026    /// Notify the node about the end of a clock epoch.
1027    ///
1028    /// The node should forward the notification to its inner operator.
1029    ///
1030    /// # Arguments
1031    ///
1032    /// * `scope` - the scope whose clock is ending.
1033    fn clock_end(&mut self, scope: Scope);
1034
1035    fn init(&mut self) {}
1036
1037    fn metadata(&self, output: &mut OperatorMeta);
1038
1039    fn fixedpoint(&self, scope: Scope) -> bool;
1040
1041    /// Invoke closure on all children of `self`, terminate on the first
1042    /// error.
1043    fn map_nodes_recursive(
1044        &self,
1045        _f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1046    ) -> Result<(), DbspError> {
1047        Ok(())
1048    }
1049
1050    /// Invoke closure on all children of `self`, terminate on the first
1051    /// error.
1052    fn map_nodes_recursive_mut(
1053        &self,
1054        _f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1055    ) -> Result<(), DbspError> {
1056        Ok(())
1057    }
1058
1059    /// Instructs the node to write the state of its inner operator to
1060    /// persistent storage within directory `base`.
1061    ///
1062    /// The node shouldn't commit the state to stable storage; rather, it should
1063    /// append the files to be committed to `files` for later commit.
1064    fn checkpoint(
1065        &mut self,
1066        base: &StoragePath,
1067        files: &mut Vec<Arc<dyn FileCommitter>>,
1068    ) -> Result<(), DbspError>;
1069
1070    /// Instructs the node to restore the state of its inner operator to
1071    /// the given checkpoint in directory `base`.
1072    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError>;
1073
1074    /// Reset the state of the operator to default.
1075    ///
1076    /// Used during replay, when the replay algorithm determines
1077    /// that the operator that may have previously picked up its state
1078    /// from a checkpoint must be backfilled from clean state.
1079    fn clear_state(&mut self) -> Result<(), DbspError>;
1080
1081    /// Place operator in the replay mode.
1082    ///
1083    /// In the replay mode the operator streams its stored state to a temporary
1084    /// replay stream.
1085    ///
1086    /// # Panics
1087    ///
1088    /// Panics for operators that don't support replay.
1089    fn start_replay(&mut self) -> Result<(), DbspError>;
1090
1091    /// Check if the operator has finished replaying its stored state.
1092    ///
1093    /// # Panics
1094    ///
1095    /// Panics for operators that don't support replay.
1096    fn is_replay_complete(&self) -> bool;
1097
1098    /// Notify the operator that the circuit is exiting the replay mode.
1099    ///
1100    /// The operator can cleanup any state needed for replay at this point.
1101    ///
1102    /// # Panics
1103    ///
1104    /// Panics for operators that don't support replay.
1105    fn end_replay(&mut self) -> Result<(), DbspError>;
1106
1107    /// Takes a fingerprint of the node's inner operator adds it to `fip`.
1108    fn fingerprint(&self, fip: &mut Fingerprinter) {
1109        fip.hash(type_name_of_val(self));
1110    }
1111
1112    /// Tag the node with a text label.
1113    fn set_label(&mut self, key: &str, value: &str);
1114
1115    /// Get the label associated with the given key.
1116    fn get_label(&self, key: &str) -> Option<&str>;
1117
1118    fn labels(&self) -> &BTreeMap<String, String>;
1119
1120    /// Apply closure to a child node of `self`.
1121    fn map_child(&self, _path: &[NodeId], _f: &mut dyn FnMut(&dyn Node)) {
1122        panic!("map_child: not a circuit node")
1123    }
1124
1125    /// Apply closure to a child node of `self`.
1126    fn map_child_mut(&self, _path: &[NodeId], _f: &mut dyn FnMut(&mut dyn Node)) {
1127        panic!("map_child_mut: not a circuit node")
1128    }
1129
1130    fn as_circuit(&self) -> Option<&dyn CircuitBase> {
1131        None
1132    }
1133
1134    fn as_any(&self) -> &dyn Any;
1135}
1136
1137/// Globally unique id of a stream.
1138#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
1139#[repr(transparent)]
1140pub struct StreamId(usize);
1141
1142impl StreamId {
1143    pub fn new(id: usize) -> Self {
1144        Self(id)
1145    }
1146
1147    /// Extracts numeric representation of the stream id.
1148    pub fn id(&self) -> usize {
1149        self.0
1150    }
1151}
1152
1153impl Display for StreamId {
1154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1155        f.write_char('s')?;
1156        Debug::fmt(&self.0, f)
1157    }
1158}
1159
1160/// Id of an operator, guaranteed to be unique within a circuit.
1161#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1162#[repr(transparent)]
1163pub struct NodeId(usize);
1164
1165impl NodeId {
1166    pub fn new(id: usize) -> Self {
1167        Self(id)
1168    }
1169
1170    /// Extracts numeric representation of the node id.
1171    pub fn id(&self) -> usize {
1172        self.0
1173    }
1174
1175    pub(super) fn root() -> Self {
1176        Self(0)
1177    }
1178}
1179
1180impl Display for NodeId {
1181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182        f.write_char('n')?;
1183        Debug::fmt(&self.0, f)
1184    }
1185}
1186
1187/// Globally unique id of a node (operator or subcircuit).
1188///
1189/// The identifier consists of a path from the top-level circuit to the node.
1190/// The top-level circuit has global id `[]`, an operator in the top-level
1191/// circuit or a sub-circuit nested inside the top-level circuit will have a
1192/// path of length 1, e.g., `[5]`, an operator inside the nested circuit
1193/// will have a path of length 2, e.g., `[5, 1]`, etc.
1194#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1195#[repr(transparent)]
1196pub struct GlobalNodeId(Vec<NodeId>);
1197
1198impl Serialize for GlobalNodeId {
1199    /// Serialize as a string containing all node ids
1200    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1201    where
1202        S: Serializer,
1203    {
1204        let s = self.node_identifier();
1205        serializer.serialize_str(&s)
1206    }
1207}
1208
1209impl Display for GlobalNodeId {
1210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1211        f.write_str("[")?;
1212        let path = self.path();
1213        for i in 0..path.len() {
1214            f.write_str(&path[i].0.to_string())?;
1215            if i < path.len() - 1 {
1216                f.write_str(".")?;
1217            }
1218        }
1219        f.write_str("]")
1220    }
1221}
1222
1223impl GlobalNodeId {
1224    /// Generate global node id from path.
1225    pub fn from_path(path: &[NodeId]) -> Self {
1226        Self(path.to_owned())
1227    }
1228
1229    /// Generate global node id from path.
1230    pub fn from_path_vec(path: Vec<NodeId>) -> Self {
1231        Self(path)
1232    }
1233
1234    pub fn root() -> Self {
1235        Self(Vec::new())
1236    }
1237
1238    /// Generate global node id by appending `child_id` to `self`.
1239    pub fn child(&self, child_id: NodeId) -> Self {
1240        let mut path = Vec::with_capacity(self.path().len() + 1);
1241        for id in self.path() {
1242            path.push(*id);
1243        }
1244        path.push(child_id);
1245        Self(path)
1246    }
1247
1248    /// Generate global node id for a child node of `circuit`.
1249    pub fn child_of<C>(circuit: &C, node_id: NodeId) -> Self
1250    where
1251        C: Circuit,
1252    {
1253        let mut ids = circuit.global_node_id().path().to_owned();
1254        ids.push(node_id);
1255        Self(ids)
1256    }
1257
1258    /// Generate unique name to use as a node label in a visual graph.
1259    pub fn node_identifier(&self) -> String {
1260        let mut node_ident = "n".to_string();
1261
1262        for i in 0..self.path().len() {
1263            node_ident.push_str(&self.path()[i].to_string());
1264            if i < self.path().len() - 1 {
1265                node_ident.push('_');
1266            }
1267        }
1268        node_ident
1269    }
1270
1271    /// Returns local node id of `self` or `None` if `self` is the root node.
1272    pub fn local_node_id(&self) -> Option<NodeId> {
1273        self.0.last().cloned()
1274    }
1275
1276    /// Returns parent id of `self` or `None` if `self` is the root node.
1277    pub fn parent_id(&self) -> Option<Self> {
1278        self.0
1279            .split_last()
1280            .map(|(_, prefix)| GlobalNodeId::from_path(prefix))
1281    }
1282
1283    /// Returns `true` if `self` is a child of `parent`.
1284    pub fn is_child_of(&self, parent: &Self) -> bool {
1285        self.parent_id().as_ref() == Some(parent)
1286    }
1287
1288    /// Get the path from global.
1289    pub fn path(&self) -> &[NodeId] {
1290        &self.0
1291    }
1292
1293    /// Ancestor of `self` in the top-level circuit.
1294    ///
1295    /// For a top-level node, its node id. For a nested node,
1296    /// returns the node id of the ancestor whose parent is the root node.
1297    ///
1298    /// # Panic
1299    ///
1300    /// Panics if `self` is the root node.
1301    pub fn top_level_ancestor(&self) -> NodeId {
1302        self.0[0]
1303    }
1304
1305    /// Convert to string in the `x-y-z` format.
1306    pub(crate) fn path_as_string(&self) -> String {
1307        self.0
1308            .iter()
1309            .map(|node_id| node_id.0.to_string())
1310            .collect::<Vec<_>>()
1311            .join("-")
1312    }
1313
1314    /// Format global node id as LIR node id.
1315    pub fn lir_node_id(&self) -> LirNodeId {
1316        LirNodeId::new(&self.path_as_string())
1317    }
1318}
1319
1320type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
1321type SchedulerEventHandler = Box<dyn FnMut(&SchedulerEvent<'_>)>;
1322type CircuitEventHandlers = Rc<RefCell<HashMap<String, CircuitEventHandler>>>;
1323type SchedulerEventHandlers = Rc<RefCell<HashMap<String, SchedulerEventHandler>>>;
1324
1325/// Operator's preference to consume input data by value.
1326///
1327/// # Background
1328///
1329/// A stream in a circuit can be connected to multiple consumers.  It is
1330/// therefore generally impossible to provide each consumer with an owned copy
1331/// of the data without cloning it.  At the same time, many operators can be
1332/// more efficient when working with owned inputs.  For instance, when computing
1333/// a sum of two z-sets, if one of the input z-sets is owned we can just add
1334/// values from the other z-set to it without cloning the first z-set.  If both
1335/// inputs are owned then we additionally do not need to clone key/value pairs
1336/// when inserting them.  Furthermore, the implementation can choose to add the
1337/// contents of the smaller z-set to the larger one.
1338///
1339/// # Ownership-aware scheduling
1340///
1341/// To leverage such optimizations, we adopt the best-effort approach: operators
1342/// consume streaming data by-value when possible while falling back to
1343/// pass-by-reference otherwise.  In a synchronous circuit, each operator reads
1344/// its input stream precisely once in each clock cycle. It is therefore
1345/// possible to determine the last consumer at each clock cycle and give it the
1346/// owned value from the channel.  It is furthermore possible for the scheduler
1347/// to schedule operators that strongly prefer owned values last.
1348///
1349/// We capture ownership preferences at two levels.  First, each individual
1350/// operator that consumes one or more streams exposes its preferences on a
1351/// per-stream basis via an API method (e.g.,
1352/// [`UnaryOperator::input_preference`]).  The [`Circuit`] API allows the
1353/// circuit builder to override these preferences when instantiating an
1354/// operator, taking into account circuit topology and workload.  We express
1355/// preference as a numeric value.
1356///
1357/// These preferences are associated with each edge in the circuit graph.  The
1358/// schedulers we have built so far implement a limited form of ownership-aware
1359/// scheduling.  They only consider strong preferences
1360/// ([`OwnershipPreference::STRONGLY_PREFER_OWNED`] and stronger) and model them
1361/// internally as hard constraints that must be satisfied for the circuit to be
1362/// schedulable.  Weaker preferences are ignored.
1363#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
1364#[repr(transparent)]
1365pub struct OwnershipPreference(usize);
1366
1367impl OwnershipPreference {
1368    /// Create a new instance with given numeric preference value (higher
1369    /// value means stronger preference).
1370    pub const fn new(val: usize) -> Self {
1371        Self(val)
1372    }
1373
1374    /// The operator does not gain any speed up from consuming an owned value.
1375    pub const INDIFFERENT: Self = Self::new(0);
1376
1377    /// The operator is likely to run faster provided an owned input, but
1378    /// shouldn't be prioritized over more impactful operators
1379    ///
1380    /// This gives a lower priority than [`Self::PREFER_OWNED`] so that
1381    /// operators who need ownership can get it when available
1382    pub const WEAKLY_PREFER_OWNED: Self = Self::new(40);
1383
1384    /// The operator is likely to run faster provided an owned input.
1385    ///
1386    /// Preference levels above `PREFER_OWNED` should not be used by operators
1387    /// and are reserved for use by circuit builders through the [`Circuit`]
1388    /// API.
1389    pub const PREFER_OWNED: Self = Self::new(50);
1390
1391    /// The circuit will suffer a significant performance hit if the operator
1392    /// cannot consume data in the channel by-value.
1393    pub const STRONGLY_PREFER_OWNED: Self = Self::new(100);
1394
1395    /// Returns the numeric value of the preference.
1396    pub const fn raw(&self) -> usize {
1397        self.0
1398    }
1399}
1400
1401impl Default for OwnershipPreference {
1402    #[inline]
1403    fn default() -> Self {
1404        Self::INDIFFERENT
1405    }
1406}
1407
1408impl Display for OwnershipPreference {
1409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1410        match *self {
1411            Self::INDIFFERENT => f.write_str("Indifferent"),
1412            Self::WEAKLY_PREFER_OWNED => f.write_str("WeaklyPreferOwned"),
1413            Self::PREFER_OWNED => f.write_str("PreferOwned"),
1414            Self::STRONGLY_PREFER_OWNED => f.write_str("StronglyPreferOwned"),
1415            Self(preference) => write!(f, "Preference({preference})"),
1416        }
1417    }
1418}
1419
1420/// An edge in a circuit graph represents a stream connecting two
1421/// operators or a dependency (i.e., a requirement that one operator
1422/// must be evaluated before the other even if they are not connected
1423/// by a stream).
1424#[derive(Clone)]
1425pub struct Edge {
1426    /// Source node.
1427    pub from: NodeId,
1428    /// Destination node.
1429    pub to: NodeId,
1430    /// Origin node that generates the stream.  If the origin belongs
1431    /// to the local circuit, this is just the full path to the `from`
1432    /// node.
1433    pub origin: GlobalNodeId,
1434    /// Stream associated with the edge, if any.  If `None`, this is a
1435    /// dependency edge.
1436    pub stream: Option<Box<dyn StreamMetadata>>,
1437    /// Ownership preference associated with the consumer of this
1438    /// stream or `None` if this is a dependency edge.
1439    pub ownership_preference: Option<OwnershipPreference>,
1440}
1441
1442#[allow(dead_code)]
1443impl Edge {
1444    /// `true` if `self` is a dependency edge.
1445    pub(crate) fn is_dependency(&self) -> bool {
1446        self.ownership_preference.is_none()
1447    }
1448
1449    /// `true` if `self` is a stream edge.
1450    pub(crate) fn is_stream(&self) -> bool {
1451        self.stream.is_some()
1452    }
1453
1454    pub(crate) fn stream_id(&self) -> Option<StreamId> {
1455        self.stream.as_ref().map(|meta| meta.stream_id())
1456    }
1457}
1458
1459circuit_cache_key!(ExportId<C, D>(StreamId => Stream<C, D>));
1460
1461// `stream` => `replay_stream` mapping designates `replay_stream`
1462// as a replay source for `stream`.
1463circuit_cache_key!(ReplaySource(StreamId => Box<dyn StreamMetadata>));
1464
1465/// Register `replay_stream` as a replay source for `stream`.
1466pub(crate) fn register_replay_stream<C, B>(
1467    circuit: &C,
1468    stream: &Stream<C, B>,
1469    replay_stream: &Stream<C, B>,
1470) where
1471    C: Circuit,
1472    B: 'static,
1473{
1474    // We currently only support using operators in the top-level circuit
1475    // as replay sources.
1476    if TypeId::of::<()>() == TypeId::of::<C::Time>() {
1477        circuit.cache_insert(
1478            ReplaySource::new(stream.stream_id()),
1479            Box::new(replay_stream.clone()),
1480        );
1481    }
1482}
1483
1484/// Trait for an object that has a clock associated with it.
1485/// This is implemented trivially for root circuits.
1486pub trait WithClock {
1487    /// `()` for a trivial zero-dimensional clock that doesn't need to count
1488    /// ticks.
1489    type Time: Timestamp;
1490
1491    /// Current time.
1492    fn time(&self) -> Self::Time;
1493}
1494
1495/// This `impl` is only needed to bootstrap the
1496/// recursive definition of `WithClock` for `ChildCircuit`.
1497/// It is never actually used at runtime.
1498impl WithClock for () {
1499    type Time = UnitTimestamp;
1500
1501    fn time(&self) -> Self::Time {
1502        UnitTimestamp
1503    }
1504}
1505
1506impl<P, T> WithClock for ChildCircuit<P, T>
1507where
1508    P: 'static,
1509    T: Timestamp,
1510{
1511    type Time = T;
1512
1513    fn time(&self) -> Self::Time {
1514        self.time.borrow().clone()
1515    }
1516}
1517
1518// TODO: use a better type than serde_json::Value.
1519#[derive(Default, Debug, Clone, Serialize, Deserialize)]
1520pub struct CircuitMetadata {
1521    metadata: HashMap<NodeId, serde_json::Value>,
1522}
1523
1524#[derive(Default, Debug)]
1525pub struct MetadataExchangeInner {
1526    /// Metadata registered by operators running in the current worker.
1527    local_metadata: RefCell<CircuitMetadata>,
1528
1529    /// Metadata received from peers. All workers have identical metadata snapshots during a step.
1530    /// This is a vector of metadata snapshots from all workers, one for each worker.
1531    global_metadata: RefCell<Vec<CircuitMetadata>>,
1532}
1533
1534/// Metadata exchange.
1535///
1536/// Allows the worker to exchange arbitrary semi-structured data with its peers.
1537///
1538/// Every operator in the circuit can update its local metadata.
1539/// Before every step, the circuit broadcasts its metadata to all other workers
1540/// and receives their metadata. As a result all workers have identical metadata
1541/// snapshots during the step and can make deterministic decisions based on it, such
1542/// as choosing a balancing policy for a stream.
1543#[derive(Default, Debug, Clone)]
1544pub struct MetadataExchange {
1545    inner: Rc<MetadataExchangeInner>,
1546}
1547
1548impl MetadataExchange {
1549    fn new() -> Self {
1550        Self::default()
1551    }
1552
1553    /// Get the current snapshot of the local metadata registered by operators running in the current worker.
1554    pub fn local_metadata(&self) -> CircuitMetadata {
1555        self.inner.local_metadata.borrow().clone()
1556    }
1557
1558    /// Update the local metadata for the operator with the given id.
1559    pub fn set_local_operator_metadata(&self, id: NodeId, metadata: serde_json::Value) {
1560        self.inner
1561            .local_metadata
1562            .borrow_mut()
1563            .metadata
1564            .insert(id, metadata.clone());
1565    }
1566
1567    /// Clear the local metadata for the operator with the given id.
1568    pub fn clear_local_operator_metadata(&self, id: NodeId) {
1569        self.inner.local_metadata.borrow_mut().metadata.remove(&id);
1570    }
1571
1572    /// Update the local metadata for the operator with the given id by serializing `metadata` to a JSON value.
1573    pub fn set_local_operator_metadata_typed<T>(&self, id: NodeId, metadata: T)
1574    where
1575        T: Serialize,
1576    {
1577        self.inner
1578            .local_metadata
1579            .borrow_mut()
1580            .metadata
1581            .insert(id, serde_json::to_value(metadata).unwrap());
1582    }
1583
1584    /// Get the current snapshot of the local metadata for the operator with the given id.
1585    pub fn get_local_operator_metadata(&self, id: NodeId) -> Option<serde_json::Value> {
1586        self.inner
1587            .local_metadata
1588            .borrow()
1589            .metadata
1590            .get(&id)
1591            .cloned()
1592    }
1593
1594    pub fn get_local_operator_metadata_typed<T>(&self, id: NodeId) -> Option<T>
1595    where
1596        T: DeserializeOwned,
1597    {
1598        self.get_local_operator_metadata(id)
1599            .map(|val| serde_json::from_value::<T>(val).unwrap())
1600    }
1601
1602    /// Set the global metadata received from peers (invoked by the scheduler).
1603    pub fn set_global_metadata(&self, global_metadata: Vec<CircuitMetadata>) {
1604        *self.inner.global_metadata.borrow_mut() = global_metadata;
1605    }
1606
1607    pub fn get_global_metadata(&self) -> Vec<CircuitMetadata> {
1608        self.inner.global_metadata.borrow().clone()
1609    }
1610
1611    /// Get metadata for the operator with the given id received from all workers before the current step.
1612    pub fn get_global_operator_metadata(&self, id: NodeId) -> Vec<Option<serde_json::Value>> {
1613        self.inner
1614            .global_metadata
1615            .borrow()
1616            .iter()
1617            .map(|global_metadata| global_metadata.metadata.get(&id).cloned())
1618            .collect()
1619    }
1620
1621    /// Get metadata for the operator with the given id received from all workers before the current step.
1622    /// Deserialize it from a JSON value to a strongly typed representation `T`.
1623    ///
1624    /// # Panic
1625    ///
1626    /// Panics if the JSON value cannot be deserialized to a strongly typed representation `T`.
1627    pub fn get_global_operator_metadata_typed<T>(&self, id: NodeId) -> Vec<Option<T>>
1628    where
1629        T: DeserializeOwned,
1630    {
1631        self.inner
1632            .global_metadata
1633            .borrow()
1634            .iter()
1635            .map(|global_metadata| {
1636                global_metadata
1637                    .metadata
1638                    .get(&id)
1639                    .cloned()
1640                    .map(|val| serde_json::from_value::<T>(val).unwrap())
1641            })
1642            .collect()
1643    }
1644}
1645
1646/// An object-safe subset of the circuit API.
1647pub trait CircuitBase: 'static {
1648    fn edges(&self) -> Ref<'_, Edges>;
1649
1650    fn edges_mut(&self) -> RefMut<'_, Edges>;
1651
1652    /// Global id of the circuit node.
1653    ///
1654    /// Returns [`GlobalNodeId::root()`] for the root circuit.
1655    fn global_id(&self) -> &GlobalNodeId;
1656
1657    /// Number of nodes in the circuit.
1658    fn num_nodes(&self) -> usize;
1659
1660    /// Returns vector of local node ids in the circuit.
1661    fn node_ids(&self) -> Vec<NodeId>;
1662
1663    fn import_nodes(&self) -> Vec<NodeId>;
1664
1665    fn clear(&mut self);
1666
1667    /// Register a dependency between `from` and `to` nodes.  A dependency tells
1668    /// the scheduler that `from` must be evaluated before `to` in each
1669    /// clock cycle even though there may not be an edge or a path
1670    /// connecting them.
1671    fn add_dependency(&self, from: NodeId, to: NodeId);
1672
1673    /// The set of transitive ancestors for each node in the circuit,
1674    /// i.e., the set of all nodes that precede it (transitively) in the `Edges` relationship.
1675    fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>>;
1676
1677    /// Allocate a new globally unique stream id.  This method can be invoked on any circuit in the pipeline,
1678    /// since all of them maintain a shared global counter.
1679    fn allocate_stream_id(&self) -> StreamId;
1680
1681    /// Reference to the global counter shared by all circuits.
1682    fn last_stream_id(&self) -> RefCell<StreamId>;
1683
1684    /// Relative depth of `self` from the root circuit.
1685    ///
1686    /// Returns 0 if `self` is the root circuit, 1 if `self` is an immediate
1687    /// child of the root circuit, etc.
1688    fn root_scope(&self) -> Scope;
1689
1690    /// Circuit's node id within the parent circuit.
1691    fn node_id(&self) -> NodeId;
1692
1693    /// Circuit's global node id.
1694    fn global_node_id(&self) -> GlobalNodeId;
1695
1696    /// Apply `f` to the node with the specified `path` relative to `self`.
1697    fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node));
1698
1699    /// Apply `f` to the node with the specified `path` relative to `self`.
1700    fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node));
1701
1702    /// Recursively apply `f` to all nodes in `self` and its children.
1703    ///
1704    /// Stop at the first error.
1705    fn map_nodes_recursive(
1706        &self,
1707        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1708    ) -> Result<(), DbspError>;
1709
1710    /// Recursively apply `f` to all nodes in `self` and its children mutably.
1711    ///
1712    /// Stop at the first error.
1713    fn map_nodes_recursive_mut(
1714        &mut self,
1715        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1716    ) -> Result<(), DbspError>;
1717
1718    /// Apply `f` to all immediate children of `self`.
1719    ///
1720    /// Stop at the first error.
1721    fn map_local_nodes(
1722        &self,
1723        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1724    ) -> Result<(), DbspError>;
1725
1726    /// Apply `f` to all immediate children of `self`.
1727    fn map_local_nodes_mut(
1728        &self,
1729        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1730    ) -> Result<(), DbspError>;
1731
1732    /// Apply closure `f` to a node with specified node id.
1733    ///
1734    /// # Panic
1735    ///
1736    /// Panics if `id` is not a valid Id of a node in `self`.
1737    fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node));
1738
1739    /// Apply `f` to all immediate subcircuits of `self`.
1740    ///
1741    /// Stop at the first error.
1742    fn map_subcircuits(
1743        &self,
1744        f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
1745    ) -> Result<(), DbspError>;
1746
1747    /// Tag the node with a text label.
1748    ///
1749    /// # Panic
1750    ///
1751    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1752    /// if there is another mutable or immutable reference to the node.
1753    fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str);
1754
1755    fn set_persistent_node_id(&self, id: &GlobalNodeId, persistent_id: Option<&str>) {
1756        if let Some(persistent_id) = persistent_id {
1757            self.set_node_label(id, LABEL_PERSISTENT_OPERATOR_ID, persistent_id);
1758        }
1759    }
1760
1761    fn set_mir_node_id(&self, id: &GlobalNodeId, mir_id: Option<&str>) {
1762        if let Some(mir_id) = mir_id {
1763            self.set_node_label(id, LABEL_MIR_NODE_ID, mir_id);
1764        }
1765    }
1766
1767    /// Get node label.
1768    ///
1769    /// # Panic
1770    ///
1771    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1772    /// if there is another mutable or immutable reference to the node.
1773    fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String>;
1774
1775    /// Node label for persistent operator id.
1776    fn get_persistent_node_id(&self, id: &GlobalNodeId) -> Option<String> {
1777        self.get_node_label(id, LABEL_PERSISTENT_OPERATOR_ID)
1778    }
1779
1780    fn check_fixedpoint(&self, scope: Scope) -> bool;
1781
1782    fn notify_start_transaction(&self) {
1783        let _ = self.map_local_nodes_mut(&mut |node| {
1784            node.start_transaction();
1785            Ok(())
1786        });
1787    }
1788
1789    /// Return the metadata exchange object associated with the circuit.
1790    fn metadata_exchange(&self) -> &MetadataExchange;
1791
1792    /// Return the balancer object associated with the circuit.
1793    fn balancer(&self) -> &Balancer;
1794
1795    /// Set the balancer hint for the operator with the given global node id.
1796    ///
1797    /// # Errors
1798    ///
1799    /// Returns an error if the operator with the given global node id is not found
1800    /// or if the hint contradicts the current balancer policy.
1801    fn set_balancer_hint(
1802        &self,
1803        global_node_id: &GlobalNodeId,
1804        hint: BalancerHint,
1805    ) -> Result<(), DbspError>;
1806
1807    /// Get the current balancer policy for all streams managed by the balancer.
1808    fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy>;
1809}
1810
1811/// The circuit interface.  All DBSP computation takes place within a circuit.
1812///
1813/// Circuits can nest.  The nesting hierarchy must be known statically at
1814/// compile time via the `Parent` associated type, which must be `()` for a root
1815/// circuit and otherwise the parent circuit's type.
1816///
1817/// A circuit has a clock represented by the `Time` associated type obtained via
1818/// the `WithClock` supertrait.  For a root circuit, this is a trivial
1819/// zero-dimensional clock that doesn't need to count ticks.
1820///
1821/// There is only one implementation, [`ChildCircuit<P>`], whose `Parent` type
1822/// is `P`.  [`RootCircuit`] is a synonym for `ChildCircuit<()>`.
1823pub trait Circuit: CircuitBase + Clone + WithClock {
1824    /// Parent circuit type or `()` for the root circuit.
1825    type Parent;
1826
1827    /// Returns the parent circuit of `self`.
1828    fn parent(&self) -> Self::Parent;
1829
1830    /// Return the root of the circuit tree.
1831    fn root_circuit(&self) -> RootCircuit;
1832
1833    /// Check if `this` and `other` refer to the same circuit instance.
1834    fn ptr_eq(this: &Self, other: &Self) -> bool;
1835
1836    /// Returns circuit event handlers attached to the circuit.
1837    fn circuit_event_handlers(&self) -> CircuitEventHandlers;
1838
1839    /// Returns scheduler event handlers attached to the circuit.
1840    fn scheduler_event_handlers(&self) -> SchedulerEventHandlers;
1841
1842    /// Deliver `event` to all circuit event handlers.
1843    fn log_circuit_event(&self, event: &CircuitEvent);
1844
1845    /// Deliver `event` to all scheduler event handlers.
1846    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>);
1847
1848    /// Apply closure `f` to a node with specified global id.
1849    ///
1850    /// # Panic
1851    ///
1852    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1853    /// if there is another mutable reference to the node.
1854    fn map_node<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> T) -> T;
1855
1856    /// Apply closure `f` to a node with specified global id.
1857    ///
1858    /// # Panic
1859    ///
1860    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1861    /// if there is another mutable or immutable reference to the node.
1862    fn map_node_mut<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1863
1864    /// Apply closure `f` to a node with specified node id.
1865    ///
1866    /// # Panic
1867    ///
1868    /// Panics if `id` is not a valid Id of a node in `self`.
1869    fn map_local_node_mut<T>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1870
1871    /// Lookup a value in the circuit cache or create and insert a new value
1872    /// if it does not exist.
1873    ///
1874    /// See [`cache`](`crate::circuit::cache`) module documentation for details.
1875    fn cache_get_or_insert_with<K, F>(&self, key: K, f: F) -> RefMut<'_, K::Value>
1876    where
1877        K: 'static + TypedMapKey<CircuitStoreMarker>,
1878        F: FnMut() -> K::Value;
1879
1880    /// Invoked by the scheduler at the end of a clock cycle, after all circuit
1881    /// operators have been evaluated.
1882    fn tick(&self);
1883
1884    /// Deliver `clock_start` notification to all nodes in the circuit.
1885    fn clock_start(&self, scope: Scope);
1886
1887    /// Deliver `clock_end` notification to all nodes in the circuit.
1888    fn clock_end(&self, scope: Scope);
1889
1890    /// `true` if the specified node is ready to execute (see
1891    /// [`Operator::ready()`](super::operator_traits::Operator::ready)).
1892    fn ready(&self, id: NodeId) -> bool;
1893
1894    /// Insert a value to the circuit cache, overwriting any existing value.
1895    ///
1896    /// See [`cache`](`crate::circuit::cache`) module documentation for
1897    /// details.
1898    fn cache_insert<K>(&self, key: K, val: K::Value)
1899    where
1900        K: TypedMapKey<CircuitStoreMarker> + 'static;
1901
1902    fn cache_contains<K>(&self, key: &K) -> bool
1903    where
1904        K: TypedMapKey<CircuitStoreMarker> + 'static;
1905
1906    fn cache_get<K>(&self, key: &K) -> Option<K::Value>
1907    where
1908        K: TypedMapKey<CircuitStoreMarker> + 'static,
1909        K::Value: Clone;
1910
1911    /// Check if a stream can be replayed, if so, return the replay stream that can be used to
1912    /// replay the contents of `stream_id`.
1913    fn get_replay_source(&self, stream_id: StreamId) -> Option<Box<dyn StreamMetadata>> {
1914        self.cache_get(&ReplaySource::new(stream_id))
1915    }
1916
1917    /// For every edge in `self` with stream id equal to `stream_id`, create an additional replay edge with
1918    /// the same destination node attached to `replay_stream`.
1919    fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata);
1920
1921    /// Connect `stream` as input to `to`.
1922    fn connect_stream<T: 'static>(
1923        &self,
1924        stream: &Stream<Self, T>,
1925        to: NodeId,
1926        ownership_preference: OwnershipPreference,
1927    );
1928
1929    fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>);
1930
1931    fn is_async_node(&self, id: NodeId) -> bool;
1932
1933    /// Evaluate operator with the given id.
1934    ///
1935    /// This method should only be used by schedulers.
1936    fn eval_node(
1937        &self,
1938        id: NodeId,
1939    ) -> impl Future<Output = Result<Option<Position>, SchedulerError>>;
1940
1941    /// Evaluate import node to pull inputs from the parent circuit.
1942    fn eval_import_node(&self, id: NodeId);
1943
1944    fn flush_node(&self, id: NodeId);
1945
1946    fn is_flush_complete(&self, id: NodeId) -> bool;
1947
1948    /// Evaluate closure `f` inside a new circuit region.
1949    ///
1950    /// A region is a logical grouping of circuit nodes.  Regions are used
1951    /// exclusively for debugging and do not affect scheduling or evaluation
1952    /// of the circuit.  This function creates a new region and executes
1953    /// closure `f` inside it.  Any operators or subcircuits created by
1954    /// `f` will belong to the new region.
1955    #[track_caller]
1956    fn region<F, T>(&self, name: &str, f: F) -> T
1957    where
1958        F: FnOnce() -> T;
1959
1960    /// Add a dependency from `preprocessor_node_id` to all input operators in the
1961    /// circuit, making sure that this node and all its predecessors
1962    /// are evaluated before the rest of the circuit.
1963    fn add_preprocessor(&self, preprocessor_node_id: NodeId);
1964
1965    /// Add a source operator to the circuit.  See [`SourceOperator`].
1966    fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
1967    where
1968        O: Data,
1969        Op: SourceOperator<O>;
1970
1971    /// Add a pair of operators that implement cross-worker communication.
1972    ///
1973    /// Operators that exchange data across workers are split into two
1974    /// operators: the **sender** responsible for partitioning values read
1975    /// from the input stream and distributing them across workers and the
1976    /// **receiver**, which receives and reassembles data received from its
1977    /// peers.  Splitting communication into two halves allows the scheduler
1978    /// to schedule useful work in between them instead of blocking to wait
1979    /// for the receiver.
1980    ///
1981    /// Exchange operators use some form of IPC or shared memory instead of
1982    /// streams to communicate.  Therefore, the sender must implement trait
1983    /// [`SinkOperator`], while the receiver implements [`SourceOperator`].
1984    ///
1985    /// This function adds both operators to the circuit and registers a
1986    /// dependency between them, making sure that the scheduler will
1987    /// evaluate the sender before the receiver even though there is no
1988    /// explicit stream connecting them.
1989    ///
1990    /// Returns the output stream produced by the receiver operator.
1991    ///
1992    /// # Arguments
1993    ///
1994    /// * `sender` - the sender half of the pair.  The sender must be a sink
1995    ///   operator
1996    /// * `receiver` - the receiver half of the pair.  Must be a source
1997    /// * `input_stream` - stream to connect as input to the `sender`.
1998    fn add_exchange<I, SndOp, O, RcvOp>(
1999        &self,
2000        sender: SndOp,
2001        receiver: RcvOp,
2002        input_stream: &Stream<Self, I>,
2003    ) -> Stream<Self, O>
2004    where
2005        I: Data,
2006        O: Data,
2007        SndOp: SinkOperator<I>,
2008        RcvOp: SourceOperator<O>;
2009
2010    /// Like [`Self::add_exchange`], but overrides the ownership
2011    /// preference on the input stream with `input_preference`.
2012    fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
2013        &self,
2014        sender: SndOp,
2015        receiver: RcvOp,
2016        input_stream: &Stream<Self, I>,
2017        input_preference: OwnershipPreference,
2018    ) -> Stream<Self, O>
2019    where
2020        I: Data,
2021        O: Data,
2022        SndOp: SinkOperator<I>,
2023        RcvOp: SourceOperator<O>;
2024
2025    /// Add a sink operator (see [`SinkOperator`]).
2026    fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
2027    where
2028        I: Data,
2029        Op: SinkOperator<I>;
2030
2031    /// Like [`Self::add_sink`], but overrides the ownership preference on the
2032    /// input stream with `input_preference`.
2033    fn add_sink_with_preference<I, Op>(
2034        &self,
2035        operator: Op,
2036        input_stream: &Stream<Self, I>,
2037        input_preference: OwnershipPreference,
2038    ) -> GlobalNodeId
2039    where
2040        I: Data,
2041        Op: SinkOperator<I>;
2042
2043    /// Add a binary sink operator (see [`BinarySinkOperator`]).
2044    fn add_binary_sink<I1, I2, Op>(
2045        &self,
2046        operator: Op,
2047        input_stream1: &Stream<Self, I1>,
2048        input_stream2: &Stream<Self, I2>,
2049    ) where
2050        I1: Data,
2051        I2: Data,
2052        Op: BinarySinkOperator<I1, I2>;
2053
2054    /// Like [`Self::add_binary_sink`], but overrides the ownership preferences
2055    /// on both input streams with `input_preference1` and
2056    /// `input_preference2`.
2057    fn add_binary_sink_with_preference<I1, I2, Op>(
2058        &self,
2059        operator: Op,
2060        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2061        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2062    ) where
2063        I1: Data,
2064        I2: Data,
2065        Op: BinarySinkOperator<I1, I2>;
2066
2067    /// Add a ternary sink operator (see [`TernarySinkOperator`]).
2068    fn add_ternary_sink<I1, I2, I3, Op>(
2069        &self,
2070        operator: Op,
2071        input_stream1: &Stream<Self, I1>,
2072        input_stream2: &Stream<Self, I2>,
2073        input_stream3: &Stream<Self, I3>,
2074    ) -> GlobalNodeId
2075    where
2076        I1: Data,
2077        I2: Data,
2078        I3: Data,
2079        Op: TernarySinkOperator<I1, I2, I3>;
2080
2081    /// Like [`Self::add_ternary_sink`], but overrides the ownership preferences
2082    /// on input streams.
2083    fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
2084        &self,
2085        operator: Op,
2086        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2087        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2088        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2089    ) -> GlobalNodeId
2090    where
2091        I1: Data,
2092        I2: Data,
2093        I3: Data,
2094        Op: TernarySinkOperator<I1, I2, I3>;
2095
2096    /// Add a unary operator (see [`UnaryOperator`]).
2097    fn add_unary_operator<I, O, Op>(
2098        &self,
2099        operator: Op,
2100        input_stream: &Stream<Self, I>,
2101    ) -> Stream<Self, O>
2102    where
2103        I: Data,
2104        O: Data,
2105        Op: UnaryOperator<I, O>;
2106
2107    /// Like [`Self::add_unary_operator`], but overrides the ownership
2108    /// preference on the input stream with `input_preference`.
2109    fn add_unary_operator_with_preference<I, O, Op>(
2110        &self,
2111        operator: Op,
2112        input_stream: &Stream<Self, I>,
2113        input_preference: OwnershipPreference,
2114    ) -> Stream<Self, O>
2115    where
2116        I: Data,
2117        O: Data,
2118        Op: UnaryOperator<I, O>;
2119
2120    /// Add a binary operator (see [`BinaryOperator`]).
2121    fn add_binary_operator<I1, I2, O, Op>(
2122        &self,
2123        operator: Op,
2124        input_stream1: &Stream<Self, I1>,
2125        input_stream2: &Stream<Self, I2>,
2126    ) -> Stream<Self, O>
2127    where
2128        I1: Data,
2129        I2: Data,
2130        O: Data,
2131        Op: BinaryOperator<I1, I2, O>;
2132
2133    /// Like [`Self::add_binary_operator`], but overrides the ownership
2134    /// preference on both input streams with `input_preference1` and
2135    /// `input_preference2` respectively.
2136    fn add_binary_operator_with_preference<I1, I2, O, Op>(
2137        &self,
2138        operator: Op,
2139        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2140        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2141    ) -> Stream<Self, O>
2142    where
2143        I1: Data,
2144        I2: Data,
2145        O: Data,
2146        Op: BinaryOperator<I1, I2, O>;
2147
2148    /// Add a ternary operator (see [`TernaryOperator`]).
2149    fn add_ternary_operator<I1, I2, I3, O, Op>(
2150        &self,
2151        operator: Op,
2152        input_stream1: &Stream<Self, I1>,
2153        input_stream2: &Stream<Self, I2>,
2154        input_stream3: &Stream<Self, I3>,
2155    ) -> Stream<Self, O>
2156    where
2157        I1: Data,
2158        I2: Data,
2159        I3: Data,
2160        O: Data,
2161        Op: TernaryOperator<I1, I2, I3, O>;
2162
2163    /// Like [`Self::add_ternary_operator`], but overrides the ownership
2164    /// preference on the input streams with specified values.
2165    #[allow(clippy::too_many_arguments)]
2166    fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
2167        &self,
2168        operator: Op,
2169        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2170        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2171        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2172    ) -> Stream<Self, O>
2173    where
2174        I1: Data,
2175        I2: Data,
2176        I3: Data,
2177        O: Data,
2178        Op: TernaryOperator<I1, I2, I3, O>;
2179
2180    /// Add a quaternary operator (see [`QuaternaryOperator`]).
2181    fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
2182        &self,
2183        operator: Op,
2184        input_stream1: &Stream<Self, I1>,
2185        input_stream2: &Stream<Self, I2>,
2186        input_stream3: &Stream<Self, I3>,
2187        input_stream4: &Stream<Self, I4>,
2188    ) -> Stream<Self, O>
2189    where
2190        I1: Data,
2191        I2: Data,
2192        I3: Data,
2193        I4: Data,
2194        O: Data,
2195        Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2196
2197    /// Like [`Self::add_quaternary_operator`], but overrides the ownership
2198    /// preference on the input streams with specified values.
2199    #[allow(clippy::too_many_arguments)]
2200    fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
2201        &self,
2202        operator: Op,
2203        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2204        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2205        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2206        input_stream4: (&Stream<Self, I4>, OwnershipPreference),
2207    ) -> Stream<Self, O>
2208    where
2209        I1: Data,
2210        I2: Data,
2211        I3: Data,
2212        I4: Data,
2213        O: Data,
2214        Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2215
2216    /// Add a N-ary operator (see [`NaryOperator`]).
2217    fn add_nary_operator<'a, I, O, Op, Iter>(
2218        &'a self,
2219        operator: Op,
2220        input_streams: Iter,
2221    ) -> Stream<Self, O>
2222    where
2223        I: Data,
2224        O: Data,
2225        Op: NaryOperator<I, O>,
2226        Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2227
2228    /// Like [`Self::add_nary_operator`], but overrides the ownership
2229    /// preference with `input_preference`.
2230    fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
2231        &'a self,
2232        operator: Op,
2233        input_streams: Iter,
2234        input_preference: OwnershipPreference,
2235    ) -> Stream<Self, O>
2236    where
2237        I: Data,
2238        O: Data,
2239        Op: NaryOperator<I, O>,
2240        Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2241
2242    /// Add a feedback loop to the circuit.
2243    ///
2244    /// Other methods in this API only support the construction of acyclic
2245    /// graphs, as they require the input stream to exist before nodes that
2246    /// consumes it are created.  This method instantiates an operator whose
2247    /// input stream can be connected later, and thus may depend on
2248    /// the operator's output.  This enables the construction of feedback loops.
2249    /// Since all loops in a well-formed circuit must include a [strict
2250    /// operator](`crate::circuit::operator_traits::StrictOperator`), `operator`
2251    /// must be [strict](`crate::circuit::operator_traits::StrictOperator`).
2252    ///
2253    /// Returns the output stream of the operator and an object that can be used
2254    /// to later connect its input.
2255    ///
2256    /// # Examples
2257    /// We build the following circuit to compute the sum of input values
2258    /// received from `source`. `z1` stores the sum accumulated during
2259    /// previous timestamps.  At every timestamp, the (`+`) operator
2260    /// computes the sum of the new value received from source and the value
2261    /// stored in `z1`.
2262    ///
2263    /// ```text
2264    ///                ┌─┐
2265    /// source ───────►│+├───┬─►
2266    ///           ┌───►└─┘   │
2267    ///           │          │ z1_feedback
2268    /// z1_output │    ┌──┐  │
2269    ///           └────┤z1│◄─┘
2270    ///                └──┘
2271    /// ```
2272    ///
2273    /// ```
2274    /// # use dbsp::{
2275    /// #     operator::{Z1, Generator},
2276    /// #     Circuit, RootCircuit,
2277    /// # };
2278    /// # let circuit = RootCircuit::build(|circuit| {
2279    /// // Create a data source.
2280    /// let source = circuit.add_source(Generator::new(|| 10));
2281    /// // Create z1.  `z1_output` will contain the output stream of `z1`; `z1_feedback`
2282    /// // is a placeholder where we can later plug the input to `z1`.
2283    /// let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
2284    /// // Connect outputs of `source` and `z1` to the plus operator.
2285    /// let plus = source.apply2(&z1_output, |n1: &usize, n2: &usize| n1 + n2);
2286    /// // Connect the output of `+` as input to `z1`.
2287    /// z1_feedback.connect(&plus);
2288    /// Ok(())
2289    /// # });
2290    /// ```
2291    fn add_feedback<I, O, Op>(
2292        &self,
2293        operator: Op,
2294    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2295    where
2296        I: Data,
2297        O: Data,
2298        Op: StrictUnaryOperator<I, O>;
2299
2300    /// Like `add_feedback`, but also assigns persistent id to the output half of the the strict operator.
2301    fn add_feedback_persistent<I, O, Op>(
2302        &self,
2303        persistent_id: Option<&str>,
2304        operator: Op,
2305    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2306    where
2307        I: Data,
2308        O: Data,
2309        Op: StrictUnaryOperator<I, O>,
2310    {
2311        let (output, feedback) = self.add_feedback(operator);
2312
2313        output.set_persistent_id(persistent_id);
2314
2315        (output, feedback)
2316    }
2317
2318    /// Like `add_feedback`, but additionally makes the output of the operator
2319    /// available to the parent circuit.
2320    ///
2321    /// Normally a [strict
2322    /// operator](`crate::circuit::operator_traits::StrictOperator`) writes a
2323    /// value computed based on inputs from previous clock cycles to its
2324    /// output stream at the start of each new clock cycle.  When the local
2325    /// clock epoch ends, the last value computed by the operator (that
2326    /// would otherwise be dropped) is written to the export stream instead.
2327    ///
2328    /// # Examples
2329    ///
2330    /// See example in the [`Self::iterate`] method.
2331    fn add_feedback_with_export<I, O, Op>(
2332        &self,
2333        operator: Op,
2334    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2335    where
2336        I: Data,
2337        O: Data,
2338        Op: StrictUnaryOperator<I, O>;
2339
2340    /// Like `add_feedback_with_export`, but also assigns persistent id to the output half of the strict operator.
2341    fn add_feedback_with_export_persistent<I, O, Op>(
2342        &self,
2343        persistent_id: Option<&str>,
2344        operator: Op,
2345    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2346    where
2347        I: Data,
2348        O: Data,
2349        Op: StrictUnaryOperator<I, O>,
2350    {
2351        let (export, feedback) = self.add_feedback_with_export(operator);
2352
2353        export.local.set_persistent_id(persistent_id);
2354
2355        (export, feedback)
2356    }
2357
2358    fn connect_feedback_with_preference<I, O, Op>(
2359        &self,
2360        output_node_id: NodeId,
2361        operator: Rc<RefCell<Op>>,
2362        input_stream: &Stream<Self, I>,
2363        input_preference: OwnershipPreference,
2364    ) where
2365        I: Data,
2366        O: Data,
2367        Op: StrictUnaryOperator<I, O>;
2368
2369    /// Add an iterative child circuit.
2370    ///
2371    /// Creates a child circuit with a nested logical clock.
2372    ///
2373    /// Creates an empty circuit with `self` as parent and invokes
2374    /// `child_constructor` to populate the circuit.  `child_constructor`
2375    /// typically captures some of the streams in `self` and connects them
2376    /// to source nodes of the child circuit.  It is also responsible for
2377    /// attaching an executor to the child circuit.  The return type `T`
2378    /// will typically contain output streams of the child.
2379    ///
2380    /// Most users should invoke higher-level APIs like [`Circuit::iterate`]
2381    /// instead of using this method directly.
2382    fn iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2383    where
2384        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2385        E: Executor<IterativeCircuit<Self>>;
2386
2387    /// Like `iterative_subcircuit`, but creates a child circuit that runs on the same
2388    /// clock as the parent.
2389    fn non_iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2390    where
2391        F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2392        E: Executor<NonIterativeCircuit<Self>>;
2393
2394    /// Add an iteratively scheduled child circuit.
2395    ///
2396    /// Add a child circuit with a nested clock.  The child will execute
2397    /// multiple times for each parent timestamp, until its termination
2398    /// condition is satisfied.  Every time the child circuit is activated
2399    /// by the parent (once per parent timestamp), the executor calls
2400    /// [`clock_start`](`super::operator_traits::Operator::clock_start`)
2401    /// on each child operator.  It then calls `eval` on all
2402    /// child operators in a causal order and checks if the termination
2403    /// condition is satisfied.  If the condition is `false`, the
2404    /// executor `eval`s all operators again.  Once the termination
2405    /// condition is `true`, the executor calls `clock_end` on all child
2406    /// operators and returns control back to the parent scheduler.
2407    ///
2408    /// The `constructor` closure populates the child circuit and returns a
2409    /// closure that checks the termination condition and an arbitrary
2410    /// user-defined return value that typically contains output streams
2411    /// of the child.
2412    ///
2413    /// # Examples
2414    ///
2415    /// ```
2416    /// # use std::{cell::RefCell, rc::Rc};
2417    /// use dbsp::{
2418    ///     operator::{Generator, Z1},
2419    ///     Circuit, RootCircuit,
2420    ///     Error as DbspError,
2421    /// };
2422    ///
2423    /// let (circuit_handle, output_handle) = RootCircuit::build(|root_circuit| {
2424    ///     // Generate sequence 0, 1, 2, ...
2425    ///     let mut n: usize = 0;
2426    ///     let source = root_circuit.add_source(Generator::new(move || {
2427    ///         let result = n;
2428    ///         n += 1;
2429    ///         result
2430    ///     }));
2431    ///     // Compute factorial of each number in the sequence.
2432    ///     let fact = root_circuit
2433    ///         .iterate(|child_circuit| {
2434    ///             let counter = Rc::new(RefCell::new(1));
2435    ///             let counter_clone = counter.clone();
2436    ///             let countdown = source.delta0(child_circuit).apply(move |parent_val| {
2437    ///                 let mut counter_borrow = counter_clone.borrow_mut();
2438    ///                 *counter_borrow += *parent_val;
2439    ///                 let res = *counter_borrow;
2440    ///                 *counter_borrow -= 1;
2441    ///                 res
2442    ///             });
2443    ///             let (z1_output, z1_feedback) = child_circuit.add_feedback_with_export(Z1::new(1));
2444    ///             let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
2445    ///             z1_feedback.connect(&mul);
2446    ///             // Stop iterating when the countdown reaches 0.
2447    ///             Ok((async move || Ok(*counter.borrow() == 0), z1_output.export))
2448    ///         })?;
2449    ///     Ok(fact.output())
2450    /// })?;
2451    ///
2452    /// let factorial = |n: usize| (1..=n).product::<usize>();
2453    /// const ITERATIONS: usize = 10;
2454    /// for i in 0..ITERATIONS {
2455    ///     circuit_handle.transaction()?;
2456    ///     let result = output_handle.take_from_all();
2457    ///     let result = result.first().unwrap();
2458    ///     println!("Iteration {:3}: {:3}! = {}", i + 1, i, result);
2459    ///     assert_eq!(*result, factorial(i));
2460    /// }
2461    ///
2462    /// Ok::<(), DbspError>(())
2463    /// ```
2464    fn iterate<F, C, T>(&self, constructor: F) -> Result<T, SchedulerError>
2465    where
2466        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2467        C: AsyncFn() -> Result<bool, SchedulerError> + 'static;
2468
2469    /// Add an iteratively scheduled child circuit.
2470    ///
2471    /// Similar to [`iterate`](`Self::iterate`), but with a user-specified
2472    /// [`Scheduler`] implementation.
2473    fn iterate_with_scheduler<F, C, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2474    where
2475        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2476        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
2477        S: Scheduler + 'static;
2478
2479    /// Add a child circuit that will iterate to a fixed point.
2480    ///
2481    /// For each parent clock cycle, the child circuit will iterate until
2482    /// reaching a fixed point, i.e., a state where the outputs of all
2483    /// operators are guaranteed to remain the same, should the nested clock
2484    /// continue ticking.
2485    ///
2486    /// The fixed point check is implemented by checking the following
2487    /// condition:
2488    ///
2489    /// * All operators in the circuit are in such a state that, if their inputs
2490    ///   remain constant (i.e., all future inputs are identical to the last
2491    ///   input), then their outputs remain constant too.
2492    ///
2493    /// This is a necessary and sufficient condition that is also easy to check
2494    /// by asking each operator if it is in a stable state (via the
2495    /// [`Operator::fixedpoint`](`super::operator_traits::Operator::fixedpoint`)
2496    /// API.
2497    ///
2498    /// # Warning
2499    ///
2500    /// The cost of checking this condition precisely can be high for some
2501    /// operators, which implement approximate checks instead.  For instance,
2502    /// delay operators ([`Z1`](`crate::operator::Z1`) and
2503    /// [`Z1Nested`](`crate::operator::Z1Nested`)) require storing the last
2504    /// two versions of the state instead of one and comparing them at each
2505    /// cycle.  Instead, they conservatively check for _specific_ fixed points,
2506    /// namely fixed points where both input and output of the operator are zero
2507    /// As a result, the circuit may fail to detect other fixed points and may
2508    /// iterate forever.
2509    ///
2510    /// The goal is to evolve the design so that circuits created using the
2511    /// high-level API (`Stream::xxx` methods) implement accurate fixed point
2512    /// checks, but there are currently no guardrails in the system against
2513    /// constructing non-compliant circuits.
2514    fn fixedpoint<F, T>(&self, constructor: F) -> Result<T, SchedulerError>
2515    where
2516        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>;
2517
2518    /// Add a child circuit that will iterate to a fixed point.
2519    ///
2520    /// Similar to [`fixedpoint`](`Self::fixedpoint`), but with a user-specified
2521    /// [`Scheduler`] implementation.
2522    fn fixedpoint_with_scheduler<F, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2523    where
2524        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>,
2525        S: Scheduler + 'static;
2526
2527    /// Make the contents of `parent_stream` available in the nested circuit
2528    /// via an [`ImportOperator`].
2529    ///
2530    /// Typically invoked via a convenience wrapper, e.g., [`Stream::delta0`].
2531    fn import_stream<I, O, Op>(
2532        &self,
2533        operator: Op,
2534        parent_stream: &Stream<Self::Parent, I>,
2535    ) -> Stream<Self, O>
2536    where
2537        Self::Parent: Circuit,
2538        I: Data,
2539        O: Data,
2540        Op: ImportOperator<I, O>;
2541
2542    /// Like [`Self::import_stream`] but overrides the ownership
2543    /// preference on the input stream with `input_preference.
2544    fn import_stream_with_preference<I, O, Op>(
2545        &self,
2546        operator: Op,
2547        parent_stream: &Stream<Self::Parent, I>,
2548        input_preference: OwnershipPreference,
2549    ) -> Stream<Self, O>
2550    where
2551        Self::Parent: Circuit,
2552        I: Data,
2553        O: Data,
2554        Op: ImportOperator<I, O>;
2555}
2556
2557/// A collection of edges in a circuit, indexed by source, destination, and stream id.
2558pub struct Edges {
2559    by_source: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2560    by_destination: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2561    by_stream: BTreeMap<Option<StreamId>, Vec<Rc<Edge>>>,
2562}
2563
2564impl Edges {
2565    fn new() -> Self {
2566        Self {
2567            by_source: BTreeMap::new(),
2568            by_destination: BTreeMap::new(),
2569            by_stream: BTreeMap::new(),
2570        }
2571    }
2572
2573    fn add_edge(&mut self, edge: Edge) {
2574        let edge = Rc::new(edge);
2575
2576        self.by_source
2577            .entry(edge.from)
2578            .or_default()
2579            .push(edge.clone());
2580        self.by_destination
2581            .entry(edge.to)
2582            .or_default()
2583            .push(edge.clone());
2584
2585        self.by_stream
2586            .entry(edge.stream.as_ref().map(|s| s.stream_id()))
2587            .or_default()
2588            .push(edge);
2589    }
2590
2591    fn extend<I>(&mut self, edges: I)
2592    where
2593        I: IntoIterator<Item = Edge>,
2594    {
2595        for edge in edges {
2596            self.add_edge(edge)
2597        }
2598    }
2599
2600    pub(crate) fn iter(&self) -> impl Iterator<Item = &Edge> {
2601        self.by_source
2602            .values()
2603            .flat_map(|edges| edges.iter().map(|edge| edge.as_ref()))
2604    }
2605
2606    pub(crate) fn get_by_stream_id(&self, stream_id: &Option<StreamId>) -> Option<&[Rc<Edge>]> {
2607        self.by_stream.get(stream_id).map(|v| v.as_slice())
2608    }
2609
2610    fn delete_stream(&mut self, stream_id: StreamId) {
2611        if let Some(edges) = self.by_stream.remove(&Some(stream_id)) {
2612            for edge in edges {
2613                if let Some(v) = self.by_source.get_mut(&edge.from) {
2614                    v.retain(|e| e.stream_id() != Some(stream_id))
2615                }
2616                if let Some(v) = self.by_destination.get_mut(&edge.to) {
2617                    v.retain(|e| e.stream_id() != Some(stream_id))
2618                }
2619            }
2620        }
2621    }
2622
2623    pub(crate) fn inputs_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2624        self.by_destination
2625            .get(&node_id)
2626            .into_iter()
2627            .flatten()
2628            .map(|edge| edge.as_ref())
2629    }
2630
2631    /// Nodes that depend on node_id directly.
2632    ///
2633    /// Nodes that have an incoming _dependency_ edge from `node_id`.
2634    pub(crate) fn depend_on(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2635        self.by_source.get(&node_id).into_iter().flat_map(|edges| {
2636            edges.iter().filter_map(|edge| {
2637                if edge.is_dependency() {
2638                    Some(edge.as_ref())
2639                } else {
2640                    None
2641                }
2642            })
2643        })
2644    }
2645
2646    /// Nodes that `node_id` depends on directly.
2647    ///
2648    /// Nodes that have an outgoing _dependency_ edge to `node_id`.
2649    pub(crate) fn dependencies_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2650        self.by_destination
2651            .get(&node_id)
2652            .into_iter()
2653            .flat_map(|edges| {
2654                edges.iter().filter_map(|edge| {
2655                    if edge.is_dependency() {
2656                        Some(edge.as_ref())
2657                    } else {
2658                        None
2659                    }
2660                })
2661            })
2662    }
2663
2664    fn clear(&mut self) {
2665        *self = Self::new();
2666    }
2667}
2668
2669/// A circuit consists of nodes and edges.  An edge from
2670/// node1 to node2 indicates that the output stream of node1
2671/// is connected to an input of node2.
2672struct CircuitInner<P>
2673where
2674    P: 'static,
2675{
2676    parent: P,
2677
2678    /// Root of the circuit tree.  `None` if this is the root circuit.
2679    root: Option<RootCircuit>,
2680
2681    root_scope: Scope,
2682
2683    /// Circuit's node id within the parent circuit.
2684    node_id: NodeId,
2685    global_node_id: GlobalNodeId,
2686    nodes: RefCell<Vec<RefCell<Box<dyn Node>>>>,
2687    edges: RefCell<Edges>,
2688    import_nodes: RefCell<Vec<NodeId>>,
2689    circuit_event_handlers: CircuitEventHandlers,
2690    scheduler_event_handlers: SchedulerEventHandlers,
2691    store: RefCell<CircuitCache>,
2692    last_stream_id: RefCell<StreamId>,
2693    metadata_exchange: MetadataExchange,
2694    balancer: Rc<Balancer>,
2695}
2696
2697impl<P> CircuitInner<P>
2698where
2699    P: 'static,
2700{
2701    #[allow(clippy::too_many_arguments)]
2702    fn new(
2703        parent: P,
2704        root: Option<RootCircuit>,
2705        root_scope: Scope,
2706        node_id: NodeId,
2707        global_node_id: GlobalNodeId,
2708        circuit_event_handlers: CircuitEventHandlers,
2709        scheduler_event_handlers: SchedulerEventHandlers,
2710        last_stream_id: RefCell<StreamId>,
2711    ) -> Self {
2712        let metadata_exchange = MetadataExchange::new();
2713
2714        Self {
2715            parent,
2716            root,
2717            root_scope,
2718            node_id,
2719            global_node_id,
2720            nodes: RefCell::new(Vec::new()),
2721            edges: RefCell::new(Edges::new()),
2722            import_nodes: RefCell::new(Vec::new()),
2723            circuit_event_handlers,
2724            scheduler_event_handlers,
2725            store: RefCell::new(TypedMap::new()),
2726            last_stream_id,
2727            metadata_exchange: metadata_exchange.clone(),
2728            balancer: Rc::new(Balancer::new(&metadata_exchange)),
2729        }
2730    }
2731
2732    fn add_edge(&self, edge: Edge) {
2733        self.edges.borrow_mut().add_edge(edge);
2734    }
2735
2736    fn add_node<N>(&self, mut node: N)
2737    where
2738        N: Node + 'static,
2739    {
2740        node.init();
2741        self.nodes
2742            .borrow_mut()
2743            .push(RefCell::new(Box::new(node) as Box<dyn Node>));
2744    }
2745
2746    fn add_import_node(&self, node_id: NodeId) {
2747        self.import_nodes.borrow_mut().push(node_id);
2748    }
2749
2750    fn import_nodes(&self) -> Vec<NodeId> {
2751        self.import_nodes.borrow().clone()
2752    }
2753
2754    fn clear(&self) {
2755        self.nodes.borrow_mut().clear();
2756        self.edges.borrow_mut().clear();
2757        self.store.borrow_mut().clear();
2758    }
2759
2760    fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
2761    where
2762        F: Fn(&CircuitEvent) + 'static,
2763    {
2764        self.circuit_event_handlers.borrow_mut().insert(
2765            name.to_string(),
2766            Box::new(handler) as Box<dyn Fn(&CircuitEvent)>,
2767        );
2768    }
2769
2770    fn unregister_circuit_event_handler(&self, name: &str) -> bool {
2771        self.circuit_event_handlers
2772            .borrow_mut()
2773            .remove(name)
2774            .is_some()
2775    }
2776
2777    fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
2778    where
2779        F: FnMut(&SchedulerEvent<'_>) + 'static,
2780    {
2781        self.scheduler_event_handlers.borrow_mut().insert(
2782            name.to_string(),
2783            Box::new(handler) as Box<dyn FnMut(&SchedulerEvent<'_>)>,
2784        );
2785    }
2786
2787    fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
2788        self.scheduler_event_handlers
2789            .borrow_mut()
2790            .remove(name)
2791            .is_some()
2792    }
2793
2794    fn log_circuit_event(&self, event: &CircuitEvent) {
2795        for (_, handler) in self.circuit_event_handlers.borrow().iter() {
2796            handler(event)
2797        }
2798    }
2799
2800    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
2801        for (_, handler) in self.scheduler_event_handlers.borrow_mut().iter_mut() {
2802            handler(event)
2803        }
2804    }
2805
2806    fn check_fixedpoint(&self, scope: Scope) -> bool {
2807        self.nodes.borrow().iter().all(|node| {
2808            /*
2809            if !res {
2810                let n = node.borrow();
2811                let n1 = n.deref();
2812                eprintln!("not fixed {:?} {}", n1.global_id(), n1.name());
2813            };
2814            */
2815            node.borrow().fixedpoint(scope)
2816        })
2817    }
2818}
2819
2820/// A circuit.
2821///
2822/// A single implementation that can operate as the top-level
2823/// circuit when instantiated with `P = ()` or a nested circuit,
2824/// with `P = ChildCircuit<..>` designating the parent circuit type.
2825pub struct ChildCircuit<P, T>
2826where
2827    P: 'static,
2828    T: Timestamp,
2829{
2830    inner: Rc<CircuitInner<P>>,
2831    time: Rc<RefCell<T>>,
2832}
2833
2834/// Top-level circuit.
2835///
2836/// `RootCircuit` is a specialization of [`ChildCircuit<P>`] with `P = ()`.  It
2837/// forms the top level of a possibly nested DBSP circuit.  Every use of DBSP
2838/// needs a top-level circuit and non-recursive queries, including all of
2839/// standard SQL, only needs a top-level circuit.
2840///
2841/// Input enters a circuit through the top level circuit only.  `RootCircuit`
2842/// has `add_input_*` methods for setting up input operators, which can only be
2843/// called within the callback passed to `RootCircuit::build`.  The data from
2844/// the input operators is represented as a [`Stream`], which may be in turn be
2845/// used as input for further operators, which are primarily instantiated via
2846/// methods on [`Stream`].  Stream output may be made available outside the
2847/// bounds of a circuit using [`Stream::output`].
2848pub type RootCircuit = ChildCircuit<(), ()>;
2849
2850pub type NestedCircuit = ChildCircuit<RootCircuit, <() as Timestamp>::Nested>;
2851
2852/// A child circuit with a nested clock.
2853pub type IterativeCircuit<P> = ChildCircuit<P, <<P as WithClock>::Time as Timestamp>::Nested>;
2854
2855/// A child circuit that runs on the same clock as the parent.
2856pub type NonIterativeCircuit<P> = ChildCircuit<P, <P as WithClock>::Time>;
2857
2858impl<P, T> Clone for ChildCircuit<P, T>
2859where
2860    P: 'static,
2861    T: Timestamp,
2862{
2863    fn clone(&self) -> Self {
2864        Self {
2865            inner: self.inner.clone(),
2866            time: self.time.clone(),
2867        }
2868    }
2869}
2870
2871impl<P, T> ChildCircuit<P, T>
2872where
2873    P: 'static,
2874    T: Timestamp,
2875{
2876    /// Immutably borrow the inner circuit.
2877    fn inner(&self) -> &CircuitInner<P> {
2878        &self.inner
2879    }
2880}
2881
2882impl RootCircuit {
2883    /// Creates a circuit and prepares it for execution by calling
2884    /// `constructor`.  The constructor should create input operators by calling
2885    /// [`RootCircuit::dyn_add_input_zset`] and related methods.  Each of these
2886    /// calls returns an input handle and a [`Stream`].  The `constructor` can
2887    /// call [`Stream`] methods to do computation, each of which yields further
2888    /// [`Stream`]s.  It can also use [`Stream::output`] to obtain an output
2889    /// handle.
2890    ///
2891    /// Returns a [`CircuitHandle`] with which the caller can control the
2892    /// circuit, plus a user-defined value returned by the constructor.  The
2893    /// `constructor` should use the latter to return the input and output
2894    /// handles it obtains, because these allow the caller to feed input into
2895    /// the circuit and read output from the circuit.
2896    ///
2897    /// The default scheduler, currently [`DynamicScheduler`], will decide the
2898    /// order in which to evaluate operators.  (This scheduler does not schedule
2899    /// processes or threads.)
2900    ///
2901    /// A client may use the returned [`CircuitHandle`] to run the circuit in
2902    /// the context of the current thread.  To instead run the circuit in a
2903    /// collection of worker threads, call [`Runtime::init_circuit`]
2904    /// instead.
2905    ///
2906    /// # Example
2907    ///
2908    /// ```
2909    /// use dbsp::{
2910    ///     operator::{Generator, Inspect},
2911    ///     Circuit, RootCircuit,
2912    /// };
2913    ///
2914    /// let circuit = RootCircuit::build(|circuit| {
2915    ///     // Add a source operator.
2916    ///     let source_stream = circuit.add_source(Generator::new(|| "Hello, world!".to_owned()));
2917    ///
2918    ///     // Add a unary operator and wire the source directly to it.
2919    ///     circuit.add_unary_operator(
2920    ///         Inspect::new(|n| println!("New output: {}", n)),
2921    ///         &source_stream,
2922    ///     );
2923    ///     Ok(())
2924    /// });
2925    /// ```
2926    pub fn build<F, T>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
2927    where
2928        F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
2929    {
2930        Self::build_with_scheduler::<F, T, DynamicScheduler>(constructor)
2931    }
2932
2933    /// Create a circuit and prepare it for execution.
2934    ///
2935    /// Similar to [`build`](`Self::build`), but with a user-specified
2936    /// [`Scheduler`] implementation that decides the order in which to evaluate
2937    /// operators.  (This scheduler does not schedule processes or threads.)
2938    pub fn build_with_scheduler<F, T, S>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
2939    where
2940        F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
2941        S: Scheduler + 'static,
2942    {
2943        // TODO: user LocalRuntime instead of Runtime + LocalSet when
2944        // tokio::LocalRuntime is stable.
2945        // Local tokio runtime that schedules operators on the current worker thread.
2946        let tokio_runtime = tokio::runtime::Builder::new_current_thread()
2947            .build()
2948            .map_err(|e| {
2949                DbspError::Scheduler(SchedulerError::TokioError {
2950                    error: e.to_string(),
2951                })
2952            })?;
2953
2954        let mut circuit = RootCircuit::new();
2955        let res = constructor(&mut circuit).map_err(DbspError::Constructor)?;
2956        let mut executor = Box::new(<OnceExecutor<S>>::new()) as Box<dyn Executor<RootCircuit>>;
2957        executor.prepare(&circuit, None)?;
2958
2959        // if Runtime::worker_index() == 0 {
2960        //     circuit.to_dot_file(
2961        //         |node| {
2962        //             Some(crate::utils::DotNodeAttributes::new().with_label(&format!(
2963        //                 "{}-{}-{}",
2964        //                 node.local_id(),
2965        //                 node.name(),
2966        //                 node.persistent_id().unwrap_or_default()
2967        //             )))
2968        //         },
2969        //         |edge| {
2970        //             let style = if edge.is_dependency() {
2971        //                 Some("dotted".to_string())
2972        //             } else {
2973        //                 None
2974        //             };
2975        //             let label = if let Some(stream) = &edge.stream {
2976        //                 Some(format!("consumers: {}", stream.num_consumers()))
2977        //             } else {
2978        //                 None
2979        //             };
2980        //             Some(
2981        //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
2982        //                     .with_style(style)
2983        //                     .with_label(label),
2984        //             )
2985        //         },
2986        //         "circuit.dot",
2987        //     );
2988        //     println!("circuit written to circuit.dot");
2989        // }
2990
2991        // Alternatively, `CircuitHandle` should expose `clock_start` and `clock_end`
2992        // APIs, so that the user can reset the circuit at runtime and start
2993        // evaluation from clean state without having to rebuild it from
2994        // scratch.
2995        circuit.log_scheduler_event(&SchedulerEvent::clock_start());
2996        circuit.clock_start(0);
2997        Ok((
2998            CircuitHandle {
2999                circuit,
3000                executor,
3001                tokio_runtime,
3002                replay_info: None,
3003            },
3004            res,
3005        ))
3006    }
3007}
3008
3009impl RootCircuit {
3010    // Create new top-level circuit.  Clients invoke this via the
3011    // [`RootCircuit::build`] API.
3012    fn new() -> Self {
3013        Self {
3014            inner: Rc::new(CircuitInner::new(
3015                (),
3016                None,
3017                0,
3018                NodeId::root(),
3019                GlobalNodeId::root(),
3020                Rc::new(RefCell::new(HashMap::new())),
3021                Rc::new(RefCell::new(HashMap::new())),
3022                RefCell::new(StreamId::new(0)),
3023            )),
3024            time: Rc::new(RefCell::new(())),
3025        }
3026    }
3027}
3028
3029impl RootCircuit {
3030    /// Attach a circuit event handler to the top-level circuit (see
3031    /// [`super::trace::CircuitEvent`] for a description of circuit events).
3032    ///
3033    /// This method should normally be called inside the closure passed to
3034    /// [`RootCircuit::build`] before adding any operators to the circuit, so
3035    /// that the handler gets to observe all nodes, edges, and subcircuits
3036    /// added to the circuit.
3037    ///
3038    /// `name` - user-readable name assigned to the handler.  If a handler with
3039    /// the same name exists, it will be replaced by the new handler.
3040    ///
3041    /// `handler` - user callback invoked on each circuit event (see
3042    /// [`super::trace::CircuitEvent`]).
3043    ///
3044    /// # Examples
3045    ///
3046    /// ```text
3047    /// TODO
3048    /// ```
3049    pub fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
3050    where
3051        F: Fn(&CircuitEvent) + 'static,
3052    {
3053        self.inner().register_circuit_event_handler(name, handler);
3054    }
3055
3056    /// Remove a circuit event handler.  Returns `true` if a handler with the
3057    /// specified name had previously been registered and `false` otherwise.
3058    pub fn unregister_circuit_event_handler(&self, name: &str) -> bool {
3059        self.inner().unregister_circuit_event_handler(name)
3060    }
3061
3062    /// Attach a scheduler event handler to the top-level circuit (see
3063    /// [`super::trace::SchedulerEvent`] for a description of scheduler
3064    /// events).
3065    ///
3066    /// This method can be used during circuit construction, inside the closure
3067    /// provided to [`RootCircuit::build`].  Use
3068    /// [`CircuitHandle::register_scheduler_event_handler`],
3069    /// [`CircuitHandle::unregister_scheduler_event_handler`] to manipulate
3070    /// scheduler callbacks at runtime.
3071    ///
3072    /// `name` - user-readable name assigned to the handler.  If a handler with
3073    /// the same name exists, it will be replaced by the new handler.
3074    ///
3075    /// `handler` - user callback invoked on each scheduler event.
3076    pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
3077    where
3078        F: FnMut(&SchedulerEvent<'_>) + 'static,
3079    {
3080        self.inner().register_scheduler_event_handler(name, handler);
3081    }
3082
3083    /// Remove a scheduler event handler.  Returns `true` if a handler with the
3084    /// specified name had previously been registered and `false` otherwise.
3085    pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
3086        self.inner().unregister_scheduler_event_handler(name)
3087    }
3088}
3089
3090impl<P, T> ChildCircuit<P, T>
3091where
3092    P: Circuit,
3093    T: Timestamp,
3094{
3095    /// Create an empty nested circuit of `parent`.
3096    fn with_parent(parent: P, id: NodeId) -> Self {
3097        let global_node_id = parent.global_node_id().child(id);
3098        let circuit_handlers = parent.circuit_event_handlers();
3099        let sched_handlers = parent.scheduler_event_handlers();
3100        let root_scope = parent.root_scope() + 1;
3101        let last_stream_id = parent.last_stream_id();
3102
3103        let root = parent.root_circuit();
3104
3105        ChildCircuit {
3106            inner: Rc::new(CircuitInner::new(
3107                parent,
3108                Some(root),
3109                root_scope,
3110                id,
3111                global_node_id,
3112                circuit_handlers,
3113                sched_handlers,
3114                last_stream_id,
3115            )),
3116            time: Rc::new(RefCell::new(Timestamp::clock_start())),
3117        }
3118    }
3119
3120    /// `true` if `self` is a subcircuit of `other`.
3121    pub fn is_child_of(&self, other: &P) -> bool {
3122        P::ptr_eq(&self.inner().parent, other)
3123    }
3124}
3125
3126// Internal API.
3127impl<P, T> ChildCircuit<P, T>
3128where
3129    P: 'static,
3130    T: Timestamp,
3131    Self: Circuit,
3132{
3133    /// Circuit's node id within the parent circuit.
3134    fn node_id(&self) -> NodeId {
3135        self.inner().node_id
3136    }
3137
3138    /// Add a node to the circuit.
3139    ///
3140    /// Allocates a new node id and invokes a user callback to create a new node
3141    /// instance. The callback may use the node id, e.g., to add an edge to
3142    /// this node.
3143    fn add_node<F, N, V>(&self, f: F) -> V
3144    where
3145        F: FnOnce(NodeId) -> (N, V),
3146        N: Node + 'static,
3147    {
3148        let id = self.inner().nodes.borrow().len();
3149
3150        // We don't hold a reference to `self.inner()` while calling `f`, so it can
3151        // safely modify the circuit, e.g., add edges.
3152        let (node, res) = f(NodeId(id));
3153        self.inner().add_node(node);
3154        res
3155    }
3156
3157    fn add_import_node(&self, node_id: NodeId) {
3158        self.inner().add_import_node(node_id);
3159    }
3160
3161    /// Like `add_node`, but the node is not created if the closure fails.
3162    fn try_add_node<F, N, V, E>(&self, f: F) -> Result<V, E>
3163    where
3164        F: FnOnce(NodeId) -> Result<(N, V), E>,
3165        N: Node + 'static,
3166    {
3167        let id = self.inner().nodes.borrow().len();
3168
3169        // We don't hold a reference to `self.inner()` while calling `f`, so it can
3170        // safely modify the circuit, e.g., add edges.
3171        let (node, res) = f(NodeId(id))?;
3172        self.inner().add_node(node);
3173        Ok(res)
3174    }
3175
3176    /// Send the specified `CircuitEvent` to all handlers attached to the
3177    /// circuit.
3178    fn log_circuit_event(&self, event: &CircuitEvent) {
3179        self.inner().log_circuit_event(event);
3180    }
3181
3182    /// Send the specified `SchedulerEvent` to all handlers attached to the
3183    /// circuit.
3184    pub(super) fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3185        self.inner().log_scheduler_event(event);
3186    }
3187}
3188
3189impl<P, T> CircuitBase for ChildCircuit<P, T>
3190where
3191    P: Clone + 'static,
3192    T: Timestamp,
3193{
3194    fn edges(&self) -> Ref<'_, Edges> {
3195        self.inner().edges.borrow()
3196    }
3197
3198    fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>> {
3199        let edges = self.edges();
3200        let mut result = BTreeMap::new();
3201
3202        // For each node, compute its transitive ancestors using BFS
3203        for node_id in self.node_ids() {
3204            let mut ancestors = BTreeSet::new();
3205            let mut queue = vec![node_id];
3206
3207            // BFS to find all transitive ancestors
3208            while let Some(current) = queue.pop() {
3209                for edge in edges.inputs_of(current) {
3210                    let ancestor_node = edge.from;
3211                    if ancestors.insert(ancestor_node) {
3212                        queue.push(ancestor_node);
3213                    }
3214                }
3215            }
3216
3217            result.insert(node_id, ancestors);
3218        }
3219
3220        result
3221    }
3222
3223    fn edges_mut(&self) -> RefMut<'_, Edges> {
3224        self.inner().edges.borrow_mut()
3225    }
3226
3227    fn num_nodes(&self) -> usize {
3228        self.inner().nodes.borrow().len()
3229    }
3230
3231    fn clear(&mut self) {
3232        self.inner().clear();
3233    }
3234
3235    fn add_dependency(&self, from: NodeId, to: NodeId) {
3236        self.log_circuit_event(&CircuitEvent::dependency(
3237            self.global_node_id().child(from),
3238            self.global_node_id().child(to),
3239        ));
3240
3241        let origin = self.global_node_id().child(from);
3242        self.inner().add_edge(Edge {
3243            from,
3244            to,
3245            origin,
3246            stream: None,
3247            ownership_preference: None,
3248        });
3249    }
3250
3251    /// Apply `f` to the node with the specified `path` relative to `self`.
3252    fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
3253        let nodes = self.inner().nodes.borrow();
3254        let node = nodes[path[0].0].borrow();
3255        if path.len() == 1 {
3256            f(node.as_ref())
3257        } else {
3258            node.map_child(&path[1..], &mut |node| f(node));
3259        }
3260    }
3261
3262    /// Apply `f` to the node with the specified `path` relative to `self`.
3263    fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
3264        let nodes = self.inner().nodes.borrow();
3265        let mut node = nodes[path[0].0].borrow_mut();
3266        if path.len() == 1 {
3267            f(node.as_mut())
3268        } else {
3269            node.map_child_mut(&path[1..], &mut |node| f(node));
3270        }
3271    }
3272
3273    fn map_nodes_recursive(
3274        &self,
3275        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3276    ) -> Result<(), DbspError> {
3277        for node in self.inner().nodes.borrow().iter() {
3278            f(node.borrow().as_ref())?;
3279            node.borrow().map_nodes_recursive(f)?;
3280        }
3281        Ok(())
3282    }
3283
3284    fn map_nodes_recursive_mut(
3285        &mut self,
3286        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3287    ) -> Result<(), DbspError> {
3288        for node in self.inner().nodes.borrow_mut().iter_mut() {
3289            f(node.borrow_mut().as_mut())?;
3290            node.borrow_mut().map_nodes_recursive_mut(f)?;
3291        }
3292
3293        Ok(())
3294    }
3295
3296    fn map_local_nodes(
3297        &self,
3298        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3299    ) -> Result<(), DbspError> {
3300        for node in self.inner().nodes.borrow().iter() {
3301            f(node.borrow().as_ref())?;
3302        }
3303        Ok(())
3304    }
3305
3306    fn map_local_nodes_mut(
3307        &self,
3308        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3309    ) -> Result<(), DbspError> {
3310        for node in self.inner().nodes.borrow_mut().iter_mut() {
3311            f(node.borrow_mut().as_mut())?;
3312        }
3313
3314        Ok(())
3315    }
3316
3317    fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node)) {
3318        self.map_node_mut_relative(&[id], &mut |node| f(node));
3319    }
3320
3321    fn map_subcircuits(
3322        &self,
3323        f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
3324    ) -> Result<(), DbspError> {
3325        for node in self.inner().nodes.borrow().iter() {
3326            let node = node.borrow();
3327            if let Some(child_circuit) = node.as_circuit() {
3328                f(child_circuit)?;
3329            }
3330        }
3331        Ok(())
3332    }
3333
3334    fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str) {
3335        self.map_node_mut(id, &mut |node| node.set_label(key, val));
3336    }
3337
3338    fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String> {
3339        self.map_node(id, &mut |node| node.get_label(key).map(str::to_string))
3340    }
3341
3342    fn global_id(&self) -> &GlobalNodeId {
3343        &self.inner().global_node_id
3344    }
3345
3346    /// Returns vector of local node ids in the circuit.
3347    fn node_ids(&self) -> Vec<NodeId> {
3348        self.inner()
3349            .nodes
3350            .borrow()
3351            .iter()
3352            .map(|node| node.borrow().local_id())
3353            .collect()
3354    }
3355
3356    fn import_nodes(&self) -> Vec<NodeId> {
3357        self.inner().import_nodes()
3358    }
3359
3360    fn allocate_stream_id(&self) -> StreamId {
3361        let circuit = self.inner();
3362        let mut last_stream_id = circuit.last_stream_id.borrow_mut();
3363        last_stream_id.0 += 1;
3364        *last_stream_id
3365    }
3366
3367    fn last_stream_id(&self) -> RefCell<StreamId> {
3368        self.inner().last_stream_id.clone()
3369    }
3370
3371    fn root_scope(&self) -> Scope {
3372        self.inner().root_scope
3373    }
3374
3375    fn node_id(&self) -> NodeId {
3376        self.inner().node_id
3377    }
3378
3379    fn global_node_id(&self) -> GlobalNodeId {
3380        self.inner().global_node_id.clone()
3381    }
3382
3383    fn check_fixedpoint(&self, scope: Scope) -> bool {
3384        self.inner().check_fixedpoint(scope)
3385    }
3386
3387    fn metadata_exchange(&self) -> &MetadataExchange {
3388        &self.inner().metadata_exchange
3389    }
3390
3391    fn balancer(&self) -> &Balancer {
3392        &self.inner().balancer
3393    }
3394
3395    fn set_balancer_hint(
3396        &self,
3397        global_node_id: &GlobalNodeId,
3398        hint: BalancerHint,
3399    ) -> Result<(), DbspError> {
3400        if global_node_id.parent_id() != Some(GlobalNodeId::root()) {
3401            return Err(DbspError::Balancer(BalancerError::NonTopLevelNode(
3402                global_node_id.clone(),
3403            )));
3404        }
3405
3406        self.inner()
3407            .balancer
3408            .set_hint(global_node_id.local_node_id().unwrap(), hint)
3409    }
3410
3411    fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy> {
3412        self.inner().balancer.get_policy()
3413    }
3414}
3415
3416impl<P, T> Circuit for ChildCircuit<P, T>
3417where
3418    P: Clone + 'static,
3419    T: Timestamp,
3420{
3421    type Parent = P;
3422
3423    fn parent(&self) -> P {
3424        self.inner().parent.clone()
3425    }
3426
3427    fn root_circuit(&self) -> RootCircuit {
3428        if <dyn Any>::is::<RootCircuit>(self) {
3429            unsafe { transmute::<&Self, &RootCircuit>(self) }.clone()
3430        } else {
3431            self.inner().root.as_ref().unwrap().clone()
3432        }
3433    }
3434
3435    fn map_node<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> V) -> V {
3436        let path = id.path();
3437        let mut result: Option<V> = None;
3438
3439        assert!(path.starts_with(self.global_id().path()));
3440
3441        self.map_node_relative(
3442            path.strip_prefix(self.global_id().path()).unwrap(),
3443            &mut |node| result = Some(f(node)),
3444        );
3445        result.unwrap()
3446    }
3447
3448    fn map_node_mut<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3449        let path = id.path();
3450        let mut result: Option<V> = None;
3451
3452        assert!(path.starts_with(self.global_id().path()));
3453
3454        self.map_node_mut_relative(
3455            path.strip_prefix(self.global_id().path()).unwrap(),
3456            &mut |node| result = Some(f(node)),
3457        );
3458        result.unwrap()
3459    }
3460
3461    fn map_local_node_mut<V>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3462        let mut result: Option<V> = None;
3463
3464        self.map_node_mut_relative(&[id], &mut |node| result = Some(f(node)));
3465        result.unwrap()
3466    }
3467
3468    fn ptr_eq(this: &Self, other: &Self) -> bool {
3469        Rc::ptr_eq(&this.inner, &other.inner)
3470    }
3471
3472    fn circuit_event_handlers(&self) -> CircuitEventHandlers {
3473        self.inner().circuit_event_handlers.clone()
3474    }
3475
3476    fn scheduler_event_handlers(&self) -> SchedulerEventHandlers {
3477        self.inner().scheduler_event_handlers.clone()
3478    }
3479
3480    fn log_circuit_event(&self, event: &CircuitEvent) {
3481        self.inner().log_circuit_event(event);
3482    }
3483
3484    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3485        self.inner().log_scheduler_event(event);
3486    }
3487
3488    fn cache_get_or_insert_with<K, F>(&self, key: K, mut f: F) -> RefMut<'_, K::Value>
3489    where
3490        K: 'static + TypedMapKey<CircuitStoreMarker>,
3491        F: FnMut() -> K::Value,
3492    {
3493        // Don't use `store.entry()`, since `f` may need to perform
3494        // its own cache lookup.
3495        if self.inner().store.borrow().contains_key(&key) {
3496            return RefMut::map(self.inner().store.borrow_mut(), |store| {
3497                store.get_mut(&key).unwrap()
3498            });
3499        }
3500
3501        let new = f();
3502
3503        // TODO: Use `RefMut::filter_map()` to only perform one lookup in the happy path
3504        //       https://github.com/rust-lang/rust/issues/81061
3505        RefMut::map(self.inner().store.borrow_mut(), |store| {
3506            store.entry(key).or_insert(new)
3507        })
3508    }
3509
3510    fn connect_stream<V: 'static>(
3511        &self,
3512        stream: &Stream<Self, V>,
3513        to: NodeId,
3514        ownership_preference: OwnershipPreference,
3515    ) {
3516        self.log_circuit_event(&CircuitEvent::stream(
3517            stream.origin_node_id().clone(),
3518            self.global_node_id().child(to),
3519            ownership_preference,
3520        ));
3521
3522        debug_assert_eq!(self.global_node_id(), stream.circuit.global_node_id());
3523        self.inner().add_edge(Edge {
3524            from: stream.local_node_id(),
3525            to,
3526            origin: stream.origin_node_id().clone(),
3527            stream: Some(Box::new(stream.clone())),
3528            ownership_preference: Some(ownership_preference),
3529        });
3530    }
3531
3532    fn tick(&self) {
3533        let mut time = self.time.borrow_mut();
3534        *time = time.advance(0);
3535    }
3536
3537    fn clock_start(&self, scope: Scope) {
3538        for node in self.inner().nodes.borrow_mut().iter_mut() {
3539            node.borrow_mut().clock_start(scope);
3540        }
3541    }
3542
3543    fn clock_end(&self, scope: Scope) {
3544        for node in self.inner().nodes.borrow_mut().iter_mut() {
3545            node.borrow_mut().clock_end(scope);
3546        }
3547
3548        let mut time = self.time.borrow_mut();
3549        *time = time.advance(scope + 1);
3550    }
3551
3552    fn ready(&self, id: NodeId) -> bool {
3553        self.inner().nodes.borrow()[id.0].borrow().ready()
3554    }
3555
3556    fn cache_insert<K>(&self, key: K, val: K::Value)
3557    where
3558        K: TypedMapKey<CircuitStoreMarker> + 'static,
3559    {
3560        self.inner().store.borrow_mut().insert(key, val);
3561    }
3562
3563    fn cache_contains<K>(&self, key: &K) -> bool
3564    where
3565        K: TypedMapKey<CircuitStoreMarker> + 'static,
3566    {
3567        self.inner().store.borrow().contains_key(key)
3568    }
3569
3570    fn cache_get<K>(&self, key: &K) -> Option<K::Value>
3571    where
3572        K: TypedMapKey<CircuitStoreMarker> + 'static,
3573        K::Value: Clone,
3574    {
3575        self.inner().store.borrow().get(key).cloned()
3576    }
3577
3578    fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>) {
3579        self.inner().nodes.borrow()[id.0]
3580            .borrow_mut()
3581            .register_ready_callback(cb);
3582    }
3583
3584    fn is_async_node(&self, id: NodeId) -> bool {
3585        self.inner().nodes.borrow()[id.0].borrow().is_async()
3586    }
3587
3588    // Justification: the scheduler must not call `eval()` on a node twice.
3589    #[allow(clippy::await_holding_refcell_ref)]
3590    async fn eval_node(&self, id: NodeId) -> Result<Option<Position>, SchedulerError> {
3591        let circuit = self.inner();
3592        debug_assert!(id.0 < circuit.nodes.borrow().len());
3593
3594        // Notify loggers while holding a reference to the inner circuit.
3595        // We normally avoid this, since a nested call from event handler
3596        // will panic in `self.inner()`, but we do it here as an
3597        // optimization.
3598        circuit.log_scheduler_event(&SchedulerEvent::eval_start(
3599            circuit.nodes.borrow()[id.0].borrow().as_ref(),
3600        ));
3601
3602        let progress = circuit.nodes.borrow()[id.0].borrow_mut().eval().await?;
3603
3604        circuit.log_scheduler_event(&SchedulerEvent::eval_end(
3605            circuit.nodes.borrow()[id.0].borrow().as_ref(),
3606        ));
3607
3608        Ok(progress)
3609    }
3610
3611    // Justification: the scheduler must not call `eval()` on a node twice.
3612    fn eval_import_node(&self, id: NodeId) {
3613        let circuit = self.inner();
3614        debug_assert!(id.0 < circuit.nodes.borrow().len());
3615        debug_assert!(circuit.import_nodes().contains(&id));
3616
3617        circuit.nodes.borrow()[id.0].borrow_mut().import();
3618    }
3619
3620    fn flush_node(&self, id: NodeId) {
3621        let circuit = self.inner();
3622        debug_assert!(id.0 < circuit.nodes.borrow().len());
3623
3624        circuit.nodes.borrow()[id.0].borrow_mut().flush();
3625    }
3626
3627    fn is_flush_complete(&self, id: NodeId) -> bool {
3628        let circuit = self.inner();
3629        debug_assert!(id.0 < circuit.nodes.borrow().len());
3630
3631        circuit.nodes.borrow()[id.0].borrow().is_flush_complete()
3632    }
3633
3634    #[track_caller]
3635    fn region<F, V>(&self, name: &str, f: F) -> V
3636    where
3637        F: FnOnce() -> V,
3638    {
3639        self.log_circuit_event(&CircuitEvent::push_region(name, Some(Location::caller())));
3640        let res = f();
3641        self.log_circuit_event(&CircuitEvent::pop_region());
3642        res
3643    }
3644
3645    fn add_preprocessor(&self, preprocessor_node_id: NodeId) {
3646        for node in self.inner().nodes.borrow_mut().iter() {
3647            if node.borrow().is_input() {
3648                self.add_dependency(preprocessor_node_id, node.borrow().local_id());
3649            }
3650        }
3651    }
3652
3653    /// Add a source operator to the circuit.  See [`SourceOperator`].
3654    fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
3655    where
3656        O: Data,
3657        Op: SourceOperator<O>,
3658    {
3659        self.add_node(|id| {
3660            self.log_circuit_event(&CircuitEvent::operator(
3661                GlobalNodeId::child_of(self, id),
3662                operator.name(),
3663                operator.location(),
3664            ));
3665
3666            let node = SourceNode::new(operator, self.clone(), id);
3667            let output_stream = node.output_stream();
3668            (node, output_stream)
3669        })
3670    }
3671
3672    fn add_exchange<I, SndOp, O, RcvOp>(
3673        &self,
3674        sender: SndOp,
3675        receiver: RcvOp,
3676        input_stream: &Stream<Self, I>,
3677    ) -> Stream<Self, O>
3678    where
3679        I: Data,
3680        O: Data,
3681        SndOp: SinkOperator<I>,
3682        RcvOp: SourceOperator<O>,
3683    {
3684        let preference = sender.input_preference();
3685        self.add_exchange_with_preference(sender, receiver, input_stream, preference)
3686    }
3687
3688    fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
3689        &self,
3690        sender: SndOp,
3691        receiver: RcvOp,
3692        input_stream: &Stream<Self, I>,
3693        input_preference: OwnershipPreference,
3694    ) -> Stream<Self, O>
3695    where
3696        I: Data,
3697        O: Data,
3698        SndOp: SinkOperator<I>,
3699        RcvOp: SourceOperator<O>,
3700    {
3701        let sender_id = self.add_node(|id| {
3702            self.log_circuit_event(&CircuitEvent::operator(
3703                GlobalNodeId::child_of(self, id),
3704                sender.name(),
3705                sender.location(),
3706            ));
3707
3708            let node = SinkNode::new(sender, input_stream.clone(), self.clone(), id);
3709            self.connect_stream(input_stream, id, input_preference);
3710            (node, id)
3711        });
3712
3713        let output_stream = self.add_node(|id| {
3714            self.log_circuit_event(&CircuitEvent::operator(
3715                GlobalNodeId::child_of(self, id),
3716                receiver.name(),
3717                receiver.location(),
3718            ));
3719
3720            let node = SourceNode::new(receiver, self.clone(), id);
3721            let output_stream = node.output_stream();
3722            (node, output_stream)
3723        });
3724
3725        self.add_dependency(sender_id, output_stream.local_node_id());
3726        output_stream
3727    }
3728
3729    fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
3730    where
3731        I: Data,
3732        Op: SinkOperator<I>,
3733    {
3734        let preference = operator.input_preference();
3735        self.add_sink_with_preference(operator, input_stream, preference)
3736    }
3737
3738    fn add_sink_with_preference<I, Op>(
3739        &self,
3740        operator: Op,
3741        input_stream: &Stream<Self, I>,
3742        input_preference: OwnershipPreference,
3743    ) -> GlobalNodeId
3744    where
3745        I: Data,
3746        Op: SinkOperator<I>,
3747    {
3748        self.add_node(|id| {
3749            let global_node_id = GlobalNodeId::child_of(self, id);
3750            // Log the operator event before the connection event, so that handlers
3751            // don't observe edges that connect to nodes they haven't seen yet.
3752            self.log_circuit_event(&CircuitEvent::operator(
3753                global_node_id.clone(),
3754                operator.name(),
3755                operator.location(),
3756            ));
3757
3758            self.connect_stream(input_stream, id, input_preference);
3759            (
3760                SinkNode::new(operator, input_stream.clone(), self.clone(), id),
3761                global_node_id,
3762            )
3763        })
3764    }
3765
3766    /// Add a binary sink operator (see [`BinarySinkOperator`]).
3767    fn add_binary_sink<I1, I2, Op>(
3768        &self,
3769        operator: Op,
3770        input_stream1: &Stream<Self, I1>,
3771        input_stream2: &Stream<Self, I2>,
3772    ) where
3773        I1: Data,
3774        I2: Data,
3775        Op: BinarySinkOperator<I1, I2>,
3776    {
3777        let (preference1, preference2) = operator.input_preference();
3778        self.add_binary_sink_with_preference(
3779            operator,
3780            (input_stream1, preference1),
3781            (input_stream2, preference2),
3782        )
3783    }
3784
3785    fn add_binary_sink_with_preference<I1, I2, Op>(
3786        &self,
3787        operator: Op,
3788        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3789        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3790    ) where
3791        I1: Data,
3792        I2: Data,
3793        Op: BinarySinkOperator<I1, I2>,
3794    {
3795        let (input_stream1, input_preference1) = input_stream1;
3796        let (input_stream2, input_preference2) = input_stream2;
3797
3798        self.add_node(|id| {
3799            self.log_circuit_event(&CircuitEvent::operator(
3800                GlobalNodeId::child_of(self, id),
3801                operator.name(),
3802                operator.location(),
3803            ));
3804
3805            let node = BinarySinkNode::new(
3806                operator,
3807                input_stream1.clone(),
3808                input_stream2.clone(),
3809                self.clone(),
3810                id,
3811            );
3812            self.connect_stream(input_stream1, id, input_preference1);
3813            self.connect_stream(input_stream2, id, input_preference2);
3814            (node, ())
3815        });
3816    }
3817
3818    /// Add a ternary sink operator (see [`TernarySinkOperator`]).
3819    fn add_ternary_sink<I1, I2, I3, Op>(
3820        &self,
3821        operator: Op,
3822        input_stream1: &Stream<Self, I1>,
3823        input_stream2: &Stream<Self, I2>,
3824        input_stream3: &Stream<Self, I3>,
3825    ) -> GlobalNodeId
3826    where
3827        I1: Data,
3828        I2: Data,
3829        I3: Data,
3830        Op: TernarySinkOperator<I1, I2, I3>,
3831    {
3832        let (preference1, preference2, preference3) = operator.input_preference();
3833        self.add_ternary_sink_with_preference(
3834            operator,
3835            (input_stream1, preference1),
3836            (input_stream2, preference2),
3837            (input_stream3, preference3),
3838        )
3839    }
3840
3841    fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
3842        &self,
3843        operator: Op,
3844        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3845        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3846        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
3847    ) -> GlobalNodeId
3848    where
3849        I1: Data,
3850        I2: Data,
3851        I3: Data,
3852        Op: TernarySinkOperator<I1, I2, I3>,
3853    {
3854        let (input_stream1, input_preference1) = input_stream1;
3855        let (input_stream2, input_preference2) = input_stream2;
3856        let (input_stream3, input_preference3) = input_stream3;
3857
3858        self.add_node(|id| {
3859            let global_node_id = GlobalNodeId::child_of(self, id);
3860
3861            self.log_circuit_event(&CircuitEvent::operator(
3862                GlobalNodeId::child_of(self, id),
3863                operator.name(),
3864                operator.location(),
3865            ));
3866
3867            let node = TernarySinkNode::new(
3868                operator,
3869                input_stream1.clone(),
3870                input_stream2.clone(),
3871                input_stream3.clone(),
3872                self.clone(),
3873                id,
3874            );
3875            self.connect_stream(input_stream1, id, input_preference1);
3876            self.connect_stream(input_stream2, id, input_preference2);
3877            self.connect_stream(input_stream3, id, input_preference3);
3878            (node, global_node_id)
3879        })
3880    }
3881
3882    fn add_unary_operator<I, O, Op>(
3883        &self,
3884        operator: Op,
3885        input_stream: &Stream<Self, I>,
3886    ) -> Stream<Self, O>
3887    where
3888        I: Data,
3889        O: Data,
3890        Op: UnaryOperator<I, O>,
3891    {
3892        let preference = operator.input_preference();
3893        self.add_unary_operator_with_preference(operator, input_stream, preference)
3894    }
3895
3896    fn add_unary_operator_with_preference<I, O, Op>(
3897        &self,
3898        operator: Op,
3899        input_stream: &Stream<Self, I>,
3900        input_preference: OwnershipPreference,
3901    ) -> Stream<Self, O>
3902    where
3903        I: Data,
3904        O: Data,
3905        Op: UnaryOperator<I, O>,
3906    {
3907        self.add_node(|id| {
3908            self.log_circuit_event(&CircuitEvent::operator(
3909                GlobalNodeId::child_of(self, id),
3910                operator.name(),
3911                operator.location(),
3912            ));
3913
3914            let node = UnaryNode::new(operator, input_stream.clone(), self.clone(), id);
3915            let output_stream = node.output_stream();
3916            self.connect_stream(input_stream, id, input_preference);
3917            (node, output_stream)
3918        })
3919    }
3920
3921    fn add_binary_operator<I1, I2, O, Op>(
3922        &self,
3923        operator: Op,
3924        input_stream1: &Stream<Self, I1>,
3925        input_stream2: &Stream<Self, I2>,
3926    ) -> Stream<Self, O>
3927    where
3928        I1: Data,
3929        I2: Data,
3930        O: Data,
3931        Op: BinaryOperator<I1, I2, O>,
3932    {
3933        let (pref1, pref2) = operator.input_preference();
3934        self.add_binary_operator_with_preference(
3935            operator,
3936            (input_stream1, pref1),
3937            (input_stream2, pref2),
3938        )
3939    }
3940
3941    fn add_binary_operator_with_preference<I1, I2, O, Op>(
3942        &self,
3943        operator: Op,
3944        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3945        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3946    ) -> Stream<Self, O>
3947    where
3948        I1: Data,
3949        I2: Data,
3950        O: Data,
3951        Op: BinaryOperator<I1, I2, O>,
3952    {
3953        let (input_stream1, input_preference1) = input_stream1;
3954        let (input_stream2, input_preference2) = input_stream2;
3955
3956        self.add_node(|id| {
3957            self.log_circuit_event(&CircuitEvent::operator(
3958                GlobalNodeId::child_of(self, id),
3959                operator.name(),
3960                operator.location(),
3961            ));
3962
3963            let node = BinaryNode::new(
3964                operator,
3965                input_stream1.clone(),
3966                input_stream2.clone(),
3967                self.clone(),
3968                id,
3969            );
3970            let output_stream = node.output_stream();
3971            self.connect_stream(input_stream1, id, input_preference1);
3972            self.connect_stream(input_stream2, id, input_preference2);
3973            (node, output_stream)
3974        })
3975    }
3976
3977    fn add_ternary_operator<I1, I2, I3, O, Op>(
3978        &self,
3979        operator: Op,
3980        input_stream1: &Stream<Self, I1>,
3981        input_stream2: &Stream<Self, I2>,
3982        input_stream3: &Stream<Self, I3>,
3983    ) -> Stream<Self, O>
3984    where
3985        I1: Data,
3986        I2: Data,
3987        I3: Data,
3988        O: Data,
3989        Op: TernaryOperator<I1, I2, I3, O>,
3990    {
3991        let (pref1, pref2, pref3) = operator.input_preference();
3992        self.add_ternary_operator_with_preference(
3993            operator,
3994            (input_stream1, pref1),
3995            (input_stream2, pref2),
3996            (input_stream3, pref3),
3997        )
3998    }
3999
4000    #[allow(clippy::too_many_arguments)]
4001    fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
4002        &self,
4003        operator: Op,
4004        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4005        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4006        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4007    ) -> Stream<Self, O>
4008    where
4009        I1: Data,
4010        I2: Data,
4011        I3: Data,
4012        O: Data,
4013        Op: TernaryOperator<I1, I2, I3, O>,
4014    {
4015        let (input_stream1, input_preference1) = input_stream1;
4016        let (input_stream2, input_preference2) = input_stream2;
4017        let (input_stream3, input_preference3) = input_stream3;
4018
4019        self.add_node(|id| {
4020            self.log_circuit_event(&CircuitEvent::operator(
4021                GlobalNodeId::child_of(self, id),
4022                operator.name(),
4023                operator.location(),
4024            ));
4025
4026            let node = TernaryNode::new(
4027                operator,
4028                input_stream1.clone(),
4029                input_stream2.clone(),
4030                input_stream3.clone(),
4031                self.clone(),
4032                id,
4033            );
4034            let output_stream = node.output_stream();
4035            self.connect_stream(input_stream1, id, input_preference1);
4036            self.connect_stream(input_stream2, id, input_preference2);
4037            self.connect_stream(input_stream3, id, input_preference3);
4038            (node, output_stream)
4039        })
4040    }
4041
4042    fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
4043        &self,
4044        operator: Op,
4045        input_stream1: &Stream<Self, I1>,
4046        input_stream2: &Stream<Self, I2>,
4047        input_stream3: &Stream<Self, I3>,
4048        input_stream4: &Stream<Self, I4>,
4049    ) -> Stream<Self, O>
4050    where
4051        I1: Data,
4052        I2: Data,
4053        I3: Data,
4054        I4: Data,
4055        O: Data,
4056        Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4057    {
4058        let (pref1, pref2, pref3, pref4) = operator.input_preference();
4059        self.add_quaternary_operator_with_preference(
4060            operator,
4061            (input_stream1, pref1),
4062            (input_stream2, pref2),
4063            (input_stream3, pref3),
4064            (input_stream4, pref4),
4065        )
4066    }
4067
4068    #[allow(clippy::too_many_arguments)]
4069    fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
4070        &self,
4071        operator: Op,
4072        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4073        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4074        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4075        input_stream4: (&Stream<Self, I4>, OwnershipPreference),
4076    ) -> Stream<Self, O>
4077    where
4078        I1: Data,
4079        I2: Data,
4080        I3: Data,
4081        I4: Data,
4082        O: Data,
4083        Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4084    {
4085        let (input_stream1, input_preference1) = input_stream1;
4086        let (input_stream2, input_preference2) = input_stream2;
4087        let (input_stream3, input_preference3) = input_stream3;
4088        let (input_stream4, input_preference4) = input_stream4;
4089
4090        self.add_node(|id| {
4091            self.log_circuit_event(&CircuitEvent::operator(
4092                GlobalNodeId::child_of(self, id),
4093                operator.name(),
4094                operator.location(),
4095            ));
4096
4097            let node = QuaternaryNode::new(
4098                operator,
4099                input_stream1.clone(),
4100                input_stream2.clone(),
4101                input_stream3.clone(),
4102                input_stream4.clone(),
4103                self.clone(),
4104                id,
4105            );
4106            let output_stream = node.output_stream();
4107            self.connect_stream(input_stream1, id, input_preference1);
4108            self.connect_stream(input_stream2, id, input_preference2);
4109            self.connect_stream(input_stream3, id, input_preference3);
4110            self.connect_stream(input_stream4, id, input_preference4);
4111            (node, output_stream)
4112        })
4113    }
4114
4115    fn add_nary_operator<'a, I, O, Op, Iter>(
4116        &'a self,
4117        operator: Op,
4118        input_streams: Iter,
4119    ) -> Stream<Self, O>
4120    where
4121        I: Data,
4122        O: Data,
4123        Op: NaryOperator<I, O>,
4124        Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4125    {
4126        let pref = operator.input_preference();
4127        self.add_nary_operator_with_preference(operator, input_streams, pref)
4128    }
4129
4130    fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
4131        &'a self,
4132        operator: Op,
4133        input_streams: Iter,
4134        input_preference: OwnershipPreference,
4135    ) -> Stream<Self, O>
4136    where
4137        I: Data,
4138        O: Data,
4139        Op: NaryOperator<I, O>,
4140        Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4141    {
4142        let input_streams: Vec<Stream<_, _>> = input_streams.into_iter().cloned().collect();
4143        self.add_node(|id| {
4144            self.log_circuit_event(&CircuitEvent::operator(
4145                GlobalNodeId::child_of(self, id),
4146                operator.name(),
4147                operator.location(),
4148            ));
4149
4150            let node = NaryNode::new(operator, input_streams.clone(), self.clone(), id);
4151            let output_stream = node.output_stream();
4152            for stream in input_streams.iter() {
4153                self.connect_stream(stream, id, input_preference);
4154            }
4155            (node, output_stream)
4156        })
4157    }
4158
4159    fn add_feedback<I, O, Op>(
4160        &self,
4161        operator: Op,
4162    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4163    where
4164        I: Data,
4165        O: Data,
4166        Op: StrictUnaryOperator<I, O>,
4167    {
4168        self.add_node(|id| {
4169            self.log_circuit_event(&CircuitEvent::strict_operator_output(
4170                GlobalNodeId::child_of(self, id),
4171                operator.name(),
4172                operator.location(),
4173            ));
4174
4175            let operator = Rc::new(RefCell::new(operator));
4176            let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4177            let output_node = FeedbackOutputNode::new(operator, self.clone(), id);
4178            let local = output_node.output_stream();
4179            (output_node, (local, connector))
4180        })
4181    }
4182
4183    fn add_feedback_with_export<I, O, Op>(
4184        &self,
4185        operator: Op,
4186    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4187    where
4188        I: Data,
4189        O: Data,
4190        Op: StrictUnaryOperator<I, O>,
4191    {
4192        self.add_node(|id| {
4193            self.log_circuit_event(&CircuitEvent::strict_operator_output(
4194                GlobalNodeId::child_of(self, id),
4195                operator.name(),
4196                operator.location(),
4197            ));
4198
4199            let operator = Rc::new(RefCell::new(operator));
4200            let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4201            let output_node = FeedbackOutputNode::with_export(operator, self.clone(), id);
4202            let local = output_node.output_stream();
4203            let export = output_node.export_stream.clone().unwrap();
4204            (output_node, (ExportStream { local, export }, connector))
4205        })
4206    }
4207
4208    /// Connect feedback loop.
4209    ///
4210    /// Returns node id of the input half of Z-1.
4211    fn connect_feedback_with_preference<I, O, Op>(
4212        &self,
4213        output_node_id: NodeId,
4214        operator: Rc<RefCell<Op>>,
4215        input_stream: &Stream<Self, I>,
4216        input_preference: OwnershipPreference,
4217    ) where
4218        I: Data,
4219        O: Data,
4220        Op: StrictUnaryOperator<I, O>,
4221    {
4222        self.add_node(|id| {
4223            self.log_circuit_event(&CircuitEvent::strict_operator_input(
4224                GlobalNodeId::child_of(self, id),
4225                output_node_id,
4226            ));
4227
4228            let output_node = FeedbackInputNode::new(operator, input_stream.clone(), id);
4229            self.connect_stream(input_stream, id, input_preference);
4230            self.add_dependency(output_node_id, id);
4231            (output_node, ())
4232        })
4233    }
4234
4235    fn iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4236    where
4237        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4238        E: Executor<IterativeCircuit<Self>>,
4239    {
4240        self.try_add_node(|id| {
4241            let global_id = GlobalNodeId::child_of(self, id);
4242            self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), true));
4243            let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4244            let (res, executor) = child_constructor(&mut child_circuit)?;
4245            let child = <ChildNode<IterativeCircuit<Self>>>::new::<E>(child_circuit, 1, executor);
4246            self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4247            Ok((child, res))
4248        })
4249    }
4250
4251    fn non_iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4252    where
4253        F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4254        E: Executor<NonIterativeCircuit<Self>>,
4255    {
4256        self.try_add_node(|id| {
4257            let global_id = GlobalNodeId::child_of(self, id);
4258            self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), false));
4259            let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4260            let (res, executor) = child_constructor(&mut child_circuit)?;
4261            let child =
4262                <ChildNode<NonIterativeCircuit<Self>>>::new::<E>(child_circuit, 0, executor);
4263            self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4264            Ok((child, res))
4265        })
4266    }
4267
4268    fn iterate<F, C, V>(&self, constructor: F) -> Result<V, SchedulerError>
4269    where
4270        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4271        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4272    {
4273        self.iterate_with_scheduler::<F, C, V, DynamicScheduler>(constructor)
4274    }
4275
4276    /// Add an iteratively scheduled child circuit.
4277    ///
4278    /// Similar to [`iterate`](`Self::iterate`), but with a user-specified
4279    /// [`Scheduler`] implementation.
4280    fn iterate_with_scheduler<F, C, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4281    where
4282        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4283        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4284        S: Scheduler + 'static,
4285    {
4286        self.iterative_subcircuit(|child| {
4287            let (termination_check, res) = constructor(child)?;
4288            let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4289            executor.prepare(child, None)?;
4290            Ok((res, executor))
4291        })
4292    }
4293
4294    fn fixedpoint<F, V>(&self, constructor: F) -> Result<V, SchedulerError>
4295    where
4296        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4297    {
4298        self.fixedpoint_with_scheduler::<F, V, DynamicScheduler>(constructor)
4299    }
4300
4301    fn fixedpoint_with_scheduler<F, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4302    where
4303        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4304        S: Scheduler + 'static,
4305    {
4306        self.iterative_subcircuit(|child| {
4307            let res = constructor(child)?;
4308            let child_clone = child.clone();
4309
4310            let consensus = Consensus::new();
4311
4312            let termination_check = async move || {
4313                // Send local fixed point status to all peers.
4314                let local_fixedpoint = child_clone.inner().check_fixedpoint(0);
4315                consensus.check(local_fixedpoint).await
4316            };
4317            let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4318            executor.prepare(child, None)?;
4319            Ok((res, executor))
4320        })
4321    }
4322
4323    fn import_stream<I, O, Op>(&self, operator: Op, parent_stream: &Stream<P, I>) -> Stream<Self, O>
4324    where
4325        Self::Parent: Circuit,
4326        I: Data,
4327        O: Data,
4328        Op: ImportOperator<I, O>,
4329    {
4330        let preference = operator.input_preference();
4331        self.import_stream_with_preference(operator, parent_stream, preference)
4332    }
4333
4334    fn import_stream_with_preference<I, O, Op>(
4335        &self,
4336        operator: Op,
4337        parent_stream: &Stream<P, I>,
4338        input_preference: OwnershipPreference,
4339    ) -> Stream<Self, O>
4340    where
4341        Self::Parent: Circuit,
4342        I: Data,
4343        O: Data,
4344        Op: ImportOperator<I, O>,
4345    {
4346        assert!(self.is_child_of(parent_stream.circuit()));
4347
4348        let output_stream = self.add_node(|id| {
4349            let node_id = self.global_node_id().child(id);
4350            self.log_circuit_event(&CircuitEvent::operator(
4351                node_id.clone(),
4352                operator.name(),
4353                operator.location(),
4354            ));
4355            let node = ImportNode::new(operator, self.clone(), parent_stream.clone(), id);
4356            // Note: here the edge points to the sub-circuit, and not to the ImportNode itself.
4357            self.parent()
4358                .connect_stream(parent_stream, self.node_id(), input_preference);
4359            // Log the actual edge going to the inner node as well
4360            self.parent().log_circuit_event(&CircuitEvent::stream(
4361                parent_stream.origin_node_id().clone(),
4362                node_id.clone(),
4363                input_preference,
4364            ));
4365            let output_stream = node.output_stream();
4366            (node, output_stream)
4367        });
4368
4369        self.add_import_node(output_stream.local_node_id());
4370
4371        output_stream
4372    }
4373
4374    fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata) {
4375        let mut edges = self.edges_mut();
4376        let mut new_edges = Vec::new();
4377
4378        let Some(edges_to_replay) = edges.get_by_stream_id(&Some(stream_id)) else {
4379            return;
4380        };
4381
4382        for edge in edges_to_replay {
4383            // println!(
4384            //     "Adding replay edge ({}) {} -> {}",
4385            //     replay_stream.origin_node_id(),
4386            //     replay_stream.local_node_id(),
4387            //     edge.to
4388            // );
4389            new_edges.push(Edge {
4390                from: replay_stream.local_node_id(),
4391                to: edge.to,
4392                origin: replay_stream.origin_node_id().clone(),
4393                stream: Some(clone_box(replay_stream)),
4394                ownership_preference: edge.ownership_preference,
4395            });
4396        }
4397
4398        edges.extend(new_edges);
4399    }
4400}
4401struct ImportNode<C, I, O, Op>
4402where
4403    C: Circuit,
4404{
4405    id: GlobalNodeId,
4406    operator: Op,
4407    parent_stream: Stream<C::Parent, I>,
4408    output_stream: Stream<C, O>,
4409    labels: BTreeMap<String, String>,
4410}
4411
4412impl<C, I, O, Op> ImportNode<C, I, O, Op>
4413where
4414    C: Circuit,
4415    C::Parent: Circuit,
4416    I: Clone + 'static,
4417    O: Clone + 'static,
4418    Op: ImportOperator<I, O>,
4419{
4420    fn new(operator: Op, circuit: C, parent_stream: Stream<C::Parent, I>, id: NodeId) -> Self {
4421        assert!(Circuit::ptr_eq(&circuit.parent(), parent_stream.circuit()));
4422
4423        Self {
4424            id: circuit.global_node_id().child(id),
4425            operator,
4426            parent_stream,
4427            output_stream: Stream::new(circuit, id),
4428            labels: BTreeMap::new(),
4429        }
4430    }
4431
4432    fn output_stream(&self) -> Stream<C, O> {
4433        self.output_stream.clone()
4434    }
4435}
4436
4437impl<C, I, O, Op> Node for ImportNode<C, I, O, Op>
4438where
4439    C: Circuit,
4440    C::Parent: Circuit,
4441    I: Clone + 'static,
4442    O: Clone + 'static,
4443    Op: ImportOperator<I, O>,
4444{
4445    fn name(&self) -> Cow<'static, str> {
4446        self.operator.name()
4447    }
4448
4449    fn local_id(&self) -> NodeId {
4450        self.id.local_node_id().unwrap()
4451    }
4452
4453    fn global_id(&self) -> &GlobalNodeId {
4454        &self.id
4455    }
4456
4457    fn is_async(&self) -> bool {
4458        self.operator.is_async()
4459    }
4460
4461    fn is_input(&self) -> bool {
4462        self.operator.is_input()
4463    }
4464
4465    fn ready(&self) -> bool {
4466        self.operator.ready()
4467    }
4468
4469    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4470        self.operator.register_ready_callback(cb);
4471    }
4472
4473    fn eval<'a>(
4474        &'a mut self,
4475    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4476        Box::pin(async {
4477            self.output_stream.put(self.operator.eval().await);
4478            Ok(self.operator.flush_progress())
4479        })
4480    }
4481
4482    fn import(&mut self) {
4483        match StreamValue::take(self.parent_stream.val()) {
4484            None => self
4485                .operator
4486                .import(StreamValue::peek(&self.parent_stream.get())),
4487            Some(val) => self.operator.import_owned(val),
4488        }
4489
4490        StreamValue::consume_token(self.parent_stream.val());
4491    }
4492
4493    fn start_transaction(&mut self) {
4494        self.operator.start_transaction();
4495    }
4496
4497    fn flush(&mut self) {
4498        self.operator.flush();
4499    }
4500
4501    fn is_flush_complete(&self) -> bool {
4502        self.operator.is_flush_complete()
4503    }
4504
4505    fn clock_start(&mut self, scope: Scope) {
4506        self.operator.clock_start(scope);
4507    }
4508
4509    fn clock_end(&mut self, scope: Scope) {
4510        self.operator.clock_end(scope);
4511    }
4512
4513    fn init(&mut self) {
4514        self.operator.init(&self.id);
4515    }
4516
4517    fn metadata(&self, output: &mut OperatorMeta) {
4518        self.operator.metadata(output);
4519    }
4520
4521    fn fixedpoint(&self, scope: Scope) -> bool {
4522        self.operator.fixedpoint(scope)
4523    }
4524
4525    fn checkpoint(
4526        &mut self,
4527        base: &StoragePath,
4528        files: &mut Vec<Arc<dyn FileCommitter>>,
4529    ) -> Result<(), DbspError> {
4530        self.operator
4531            .checkpoint(base, self.persistent_id().as_deref(), files)
4532    }
4533
4534    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4535        self.operator.restore(base, self.persistent_id().as_deref())
4536    }
4537
4538    fn clear_state(&mut self) -> Result<(), DbspError> {
4539        self.operator.clear_state()
4540    }
4541
4542    fn start_replay(&mut self) -> Result<(), DbspError> {
4543        self.operator.start_replay()
4544    }
4545
4546    fn end_replay(&mut self) -> Result<(), DbspError> {
4547        self.operator.end_replay()
4548    }
4549
4550    fn is_replay_complete(&self) -> bool {
4551        self.operator.is_replay_complete()
4552    }
4553
4554    fn set_label(&mut self, key: &str, value: &str) {
4555        self.labels.insert(key.to_string(), value.to_string());
4556    }
4557
4558    fn get_label(&self, key: &str) -> Option<&str> {
4559        self.labels.get(key).map(|s| s.as_str())
4560    }
4561
4562    fn labels(&self) -> &BTreeMap<String, String> {
4563        &self.labels
4564    }
4565
4566    fn as_any(&self) -> &dyn Any {
4567        self
4568    }
4569}
4570
4571struct SourceNode<C, O, Op> {
4572    id: GlobalNodeId,
4573    operator: Op,
4574    output_stream: Stream<C, O>,
4575    labels: BTreeMap<String, String>,
4576}
4577
4578impl<C, O, Op> SourceNode<C, O, Op>
4579where
4580    Op: SourceOperator<O>,
4581    C: Circuit,
4582{
4583    fn new(operator: Op, circuit: C, id: NodeId) -> Self {
4584        Self {
4585            id: circuit.global_node_id().child(id),
4586            operator,
4587            output_stream: Stream::new(circuit, id),
4588            labels: BTreeMap::new(),
4589        }
4590    }
4591
4592    fn output_stream(&self) -> Stream<C, O> {
4593        self.output_stream.clone()
4594    }
4595}
4596
4597impl<C, O, Op> Node for SourceNode<C, O, Op>
4598where
4599    C: Circuit,
4600    O: Clone + 'static,
4601    Op: SourceOperator<O>,
4602{
4603    fn name(&self) -> Cow<'static, str> {
4604        self.operator.name()
4605    }
4606
4607    fn local_id(&self) -> NodeId {
4608        self.id.local_node_id().unwrap()
4609    }
4610
4611    fn global_id(&self) -> &GlobalNodeId {
4612        &self.id
4613    }
4614
4615    fn is_async(&self) -> bool {
4616        self.operator.is_async()
4617    }
4618
4619    fn is_input(&self) -> bool {
4620        self.operator.is_input()
4621    }
4622
4623    fn ready(&self) -> bool {
4624        self.operator.ready()
4625    }
4626
4627    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4628        self.operator.register_ready_callback(cb);
4629    }
4630
4631    fn eval<'a>(
4632        &'a mut self,
4633    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4634        Box::pin(async {
4635            self.output_stream.put(self.operator.eval().await);
4636            Ok(self.operator.flush_progress())
4637        })
4638    }
4639
4640    fn start_transaction(&mut self) {
4641        self.operator.start_transaction();
4642    }
4643
4644    fn flush(&mut self) {
4645        self.operator.flush();
4646    }
4647
4648    fn is_flush_complete(&self) -> bool {
4649        self.operator.is_flush_complete()
4650    }
4651
4652    fn clock_start(&mut self, scope: Scope) {
4653        self.operator.clock_start(scope);
4654    }
4655
4656    fn clock_end(&mut self, scope: Scope) {
4657        self.operator.clock_end(scope);
4658    }
4659
4660    fn init(&mut self) {
4661        self.operator.init(&self.id);
4662    }
4663
4664    fn metadata(&self, output: &mut OperatorMeta) {
4665        self.operator.metadata(output);
4666    }
4667
4668    fn fixedpoint(&self, scope: Scope) -> bool {
4669        self.operator.fixedpoint(scope)
4670    }
4671
4672    fn checkpoint(
4673        &mut self,
4674        base: &StoragePath,
4675        files: &mut Vec<Arc<dyn FileCommitter>>,
4676    ) -> Result<(), DbspError> {
4677        self.operator
4678            .checkpoint(base, self.persistent_id().as_deref(), files)
4679    }
4680
4681    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4682        self.operator.restore(base, self.persistent_id().as_deref())
4683    }
4684
4685    fn clear_state(&mut self) -> Result<(), DbspError> {
4686        self.operator.clear_state()
4687    }
4688
4689    fn start_replay(&mut self) -> Result<(), DbspError> {
4690        self.operator.start_replay()
4691    }
4692
4693    fn is_replay_complete(&self) -> bool {
4694        self.operator.is_replay_complete()
4695    }
4696
4697    fn end_replay(&mut self) -> Result<(), DbspError> {
4698        self.operator.end_replay()
4699    }
4700
4701    fn set_label(&mut self, key: &str, value: &str) {
4702        self.labels.insert(key.to_string(), value.to_string());
4703    }
4704
4705    fn get_label(&self, key: &str) -> Option<&str> {
4706        self.labels.get(key).map(|s| s.as_str())
4707    }
4708
4709    fn labels(&self) -> &BTreeMap<String, String> {
4710        &self.labels
4711    }
4712
4713    fn as_any(&self) -> &dyn Any {
4714        self
4715    }
4716}
4717
4718struct UnaryNode<C, I, O, Op> {
4719    id: GlobalNodeId,
4720    operator: Op,
4721    input_stream: Stream<C, I>,
4722    output_stream: Stream<C, O>,
4723    labels: BTreeMap<String, String>,
4724}
4725
4726impl<C, I, O, Op> UnaryNode<C, I, O, Op>
4727where
4728    Op: UnaryOperator<I, O>,
4729    C: Circuit,
4730{
4731    fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4732        Self {
4733            id: circuit.global_node_id().child(id),
4734            operator,
4735            input_stream,
4736            output_stream: Stream::new(circuit, id),
4737            labels: BTreeMap::new(),
4738        }
4739    }
4740
4741    fn output_stream(&self) -> Stream<C, O> {
4742        self.output_stream.clone()
4743    }
4744}
4745
4746impl<C, I, O, Op> Node for UnaryNode<C, I, O, Op>
4747where
4748    C: Circuit,
4749    I: Clone + 'static,
4750    O: Clone + 'static,
4751    Op: UnaryOperator<I, O>,
4752{
4753    fn name(&self) -> Cow<'static, str> {
4754        self.operator.name()
4755    }
4756
4757    fn local_id(&self) -> NodeId {
4758        self.id.local_node_id().unwrap()
4759    }
4760
4761    fn global_id(&self) -> &GlobalNodeId {
4762        &self.id
4763    }
4764
4765    fn is_async(&self) -> bool {
4766        self.operator.is_async()
4767    }
4768
4769    fn is_input(&self) -> bool {
4770        self.operator.is_input()
4771    }
4772
4773    fn ready(&self) -> bool {
4774        self.operator.ready()
4775    }
4776
4777    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4778        self.operator.register_ready_callback(cb);
4779    }
4780
4781    // Justification: see StreamValue::take() comment.
4782    #[allow(clippy::await_holding_refcell_ref)]
4783    fn eval<'a>(
4784        &'a mut self,
4785    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4786        Box::pin(async {
4787            self.output_stream
4788                .put(match StreamValue::take(self.input_stream.val()) {
4789                    Some(v) => self.operator.eval_owned(v).await,
4790                    None => {
4791                        self.operator
4792                            .eval(StreamValue::peek(&self.input_stream.get()))
4793                            .await
4794                    }
4795                });
4796            StreamValue::consume_token(self.input_stream.val());
4797            Ok(self.operator.flush_progress())
4798        })
4799    }
4800
4801    fn start_transaction(&mut self) {
4802        self.operator.start_transaction();
4803    }
4804
4805    fn flush(&mut self) {
4806        self.operator.flush();
4807    }
4808
4809    fn is_flush_complete(&self) -> bool {
4810        self.operator.is_flush_complete()
4811    }
4812
4813    fn clock_start(&mut self, scope: Scope) {
4814        self.operator.clock_start(scope);
4815    }
4816
4817    fn clock_end(&mut self, scope: Scope) {
4818        self.operator.clock_end(scope);
4819    }
4820
4821    fn init(&mut self) {
4822        self.operator.init(&self.id);
4823    }
4824
4825    fn metadata(&self, output: &mut OperatorMeta) {
4826        self.operator.metadata(output);
4827    }
4828
4829    fn fixedpoint(&self, scope: Scope) -> bool {
4830        self.operator.fixedpoint(scope)
4831    }
4832
4833    fn checkpoint(
4834        &mut self,
4835        base: &StoragePath,
4836        files: &mut Vec<Arc<dyn FileCommitter>>,
4837    ) -> Result<(), DbspError> {
4838        self.operator
4839            .checkpoint(base, self.persistent_id().as_deref(), files)
4840    }
4841
4842    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4843        self.operator.restore(base, self.persistent_id().as_deref())
4844    }
4845
4846    fn clear_state(&mut self) -> Result<(), DbspError> {
4847        self.operator.clear_state()
4848    }
4849
4850    fn start_replay(&mut self) -> Result<(), DbspError> {
4851        self.operator.start_replay()
4852    }
4853
4854    fn is_replay_complete(&self) -> bool {
4855        self.operator.is_replay_complete()
4856    }
4857
4858    fn end_replay(&mut self) -> Result<(), DbspError> {
4859        self.operator.end_replay()
4860    }
4861
4862    fn set_label(&mut self, key: &str, value: &str) {
4863        self.labels.insert(key.to_string(), value.to_string());
4864    }
4865
4866    fn get_label(&self, key: &str) -> Option<&str> {
4867        self.labels.get(key).map(|s| s.as_str())
4868    }
4869
4870    fn labels(&self) -> &BTreeMap<String, String> {
4871        &self.labels
4872    }
4873
4874    fn as_any(&self) -> &dyn Any {
4875        self
4876    }
4877}
4878
4879struct SinkNode<C, I, Op> {
4880    id: GlobalNodeId,
4881    operator: Op,
4882    input_stream: Stream<C, I>,
4883    labels: BTreeMap<String, String>,
4884}
4885
4886impl<C, I, Op> SinkNode<C, I, Op>
4887where
4888    Op: SinkOperator<I>,
4889    C: Circuit,
4890{
4891    fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4892        Self {
4893            id: circuit.global_node_id().child(id),
4894            operator,
4895            input_stream,
4896            labels: BTreeMap::new(),
4897        }
4898    }
4899}
4900
4901impl<C, I, Op> Node for SinkNode<C, I, Op>
4902where
4903    C: Circuit,
4904    I: Clone + 'static,
4905    Op: SinkOperator<I>,
4906{
4907    fn name(&self) -> Cow<'static, str> {
4908        self.operator.name()
4909    }
4910
4911    fn local_id(&self) -> NodeId {
4912        self.id.local_node_id().unwrap()
4913    }
4914
4915    fn global_id(&self) -> &GlobalNodeId {
4916        &self.id
4917    }
4918
4919    fn is_async(&self) -> bool {
4920        self.operator.is_async()
4921    }
4922
4923    fn is_input(&self) -> bool {
4924        self.operator.is_input()
4925    }
4926
4927    fn ready(&self) -> bool {
4928        self.operator.ready()
4929    }
4930
4931    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4932        self.operator.register_ready_callback(cb);
4933    }
4934
4935    // Justification: see StreamValue::take() comment.
4936    #[allow(clippy::await_holding_refcell_ref)]
4937    fn eval<'a>(
4938        &'a mut self,
4939    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4940        Box::pin(async {
4941            match StreamValue::take(self.input_stream.val()) {
4942                Some(v) => self.operator.eval_owned(v).await,
4943                None => {
4944                    self.operator
4945                        .eval(StreamValue::peek(&self.input_stream.get()))
4946                        .await
4947                }
4948            };
4949            StreamValue::consume_token(self.input_stream.val());
4950
4951            Ok(self.operator.flush_progress())
4952        })
4953    }
4954
4955    fn start_transaction(&mut self) {
4956        self.operator.start_transaction();
4957    }
4958
4959    fn flush(&mut self) {
4960        self.operator.flush();
4961    }
4962
4963    fn is_flush_complete(&self) -> bool {
4964        self.operator.is_flush_complete()
4965    }
4966
4967    fn clock_start(&mut self, scope: Scope) {
4968        self.operator.clock_start(scope);
4969    }
4970
4971    fn clock_end(&mut self, scope: Scope) {
4972        self.operator.clock_end(scope);
4973    }
4974
4975    fn init(&mut self) {
4976        self.operator.init(&self.id);
4977    }
4978
4979    fn metadata(&self, output: &mut OperatorMeta) {
4980        self.operator.metadata(output);
4981    }
4982
4983    fn fixedpoint(&self, scope: Scope) -> bool {
4984        self.operator.fixedpoint(scope)
4985    }
4986
4987    fn checkpoint(
4988        &mut self,
4989        base: &StoragePath,
4990        files: &mut Vec<Arc<dyn FileCommitter>>,
4991    ) -> Result<(), DbspError> {
4992        self.operator
4993            .checkpoint(base, self.persistent_id().as_deref(), files)
4994    }
4995
4996    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4997        self.operator.restore(base, self.persistent_id().as_deref())
4998    }
4999
5000    fn clear_state(&mut self) -> Result<(), DbspError> {
5001        self.operator.clear_state()
5002    }
5003
5004    fn start_replay(&mut self) -> Result<(), DbspError> {
5005        self.operator.start_replay()
5006    }
5007
5008    fn is_replay_complete(&self) -> bool {
5009        self.operator.is_replay_complete()
5010    }
5011
5012    fn end_replay(&mut self) -> Result<(), DbspError> {
5013        self.operator.end_replay()
5014    }
5015
5016    fn set_label(&mut self, key: &str, value: &str) {
5017        self.labels.insert(key.to_string(), value.to_string());
5018    }
5019
5020    fn get_label(&self, key: &str) -> Option<&str> {
5021        self.labels.get(key).map(|s| s.as_str())
5022    }
5023
5024    fn labels(&self) -> &BTreeMap<String, String> {
5025        &self.labels
5026    }
5027
5028    fn as_any(&self) -> &dyn Any {
5029        self
5030    }
5031}
5032
5033struct BinarySinkNode<C, I1, I2, Op> {
5034    id: GlobalNodeId,
5035    operator: Op,
5036    input_stream1: Stream<C, I1>,
5037    input_stream2: Stream<C, I2>,
5038    // `true` if both input streams are aliases of the same stream.
5039    is_alias: bool,
5040    labels: BTreeMap<String, String>,
5041}
5042
5043impl<C, I1, I2, Op> BinarySinkNode<C, I1, I2, Op>
5044where
5045    I1: Clone,
5046    I2: Clone,
5047    Op: BinarySinkOperator<I1, I2>,
5048    C: Circuit,
5049{
5050    fn new(
5051        operator: Op,
5052        input_stream1: Stream<C, I1>,
5053        input_stream2: Stream<C, I2>,
5054        circuit: C,
5055        id: NodeId,
5056    ) -> Self {
5057        let is_alias = input_stream1.ptr_eq(&input_stream2);
5058        Self {
5059            id: circuit.global_node_id().child(id),
5060            operator,
5061            input_stream1,
5062            input_stream2,
5063            is_alias,
5064            labels: BTreeMap::new(),
5065        }
5066    }
5067}
5068
5069impl<C, I1, I2, Op> Node for BinarySinkNode<C, I1, I2, Op>
5070where
5071    C: Circuit,
5072    I1: Clone + 'static,
5073    I2: Clone + 'static,
5074    Op: BinarySinkOperator<I1, I2>,
5075{
5076    fn name(&self) -> Cow<'static, str> {
5077        self.operator.name()
5078    }
5079
5080    fn local_id(&self) -> NodeId {
5081        self.id.local_node_id().unwrap()
5082    }
5083
5084    fn global_id(&self) -> &GlobalNodeId {
5085        &self.id
5086    }
5087
5088    fn is_async(&self) -> bool {
5089        self.operator.is_async()
5090    }
5091
5092    fn is_input(&self) -> bool {
5093        self.operator.is_input()
5094    }
5095
5096    fn ready(&self) -> bool {
5097        self.operator.ready()
5098    }
5099
5100    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5101        self.operator.register_ready_callback(cb);
5102    }
5103
5104    // Justification: see StreamValue::take() comment.
5105    #[allow(clippy::await_holding_refcell_ref)]
5106    fn eval<'a>(
5107        &'a mut self,
5108    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5109        Box::pin(async {
5110            if self.is_alias {
5111                {
5112                    let val1 = self.input_stream1.get();
5113                    let val2 = self.input_stream2.get();
5114                    self.operator
5115                        .eval(
5116                            Cow::Borrowed(StreamValue::peek(&val1)),
5117                            Cow::Borrowed(StreamValue::peek(&val2)),
5118                        )
5119                        .await;
5120                }
5121
5122                StreamValue::consume_token(self.input_stream1.val());
5123                StreamValue::consume_token(self.input_stream2.val());
5124            } else {
5125                let val1 = StreamValue::take(self.input_stream1.val());
5126                let val2 = StreamValue::take(self.input_stream2.val());
5127
5128                match (val1, val2) {
5129                    (Some(val1), Some(val2)) => {
5130                        self.operator.eval(Cow::Owned(val1), Cow::Owned(val2)).await;
5131                    }
5132                    (Some(val1), None) => {
5133                        self.operator
5134                            .eval(
5135                                Cow::Owned(val1),
5136                                Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5137                            )
5138                            .await;
5139                    }
5140                    (None, Some(val2)) => {
5141                        self.operator
5142                            .eval(
5143                                Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5144                                Cow::Owned(val2),
5145                            )
5146                            .await;
5147                    }
5148                    (None, None) => {
5149                        self.operator
5150                            .eval(
5151                                Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5152                                Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5153                            )
5154                            .await;
5155                    }
5156                }
5157
5158                StreamValue::consume_token(self.input_stream1.val());
5159                StreamValue::consume_token(self.input_stream2.val());
5160            };
5161
5162            Ok(self.operator.flush_progress())
5163        })
5164    }
5165
5166    fn start_transaction(&mut self) {
5167        self.operator.start_transaction();
5168    }
5169
5170    fn flush(&mut self) {
5171        self.operator.flush();
5172    }
5173
5174    fn is_flush_complete(&self) -> bool {
5175        self.operator.is_flush_complete()
5176    }
5177
5178    fn clock_start(&mut self, scope: Scope) {
5179        self.operator.clock_start(scope);
5180    }
5181
5182    fn clock_end(&mut self, scope: Scope) {
5183        self.operator.clock_end(scope);
5184    }
5185
5186    fn init(&mut self) {
5187        self.operator.init(&self.id);
5188    }
5189
5190    fn metadata(&self, output: &mut OperatorMeta) {
5191        self.operator.metadata(output);
5192    }
5193
5194    fn fixedpoint(&self, scope: Scope) -> bool {
5195        self.operator.fixedpoint(scope)
5196    }
5197
5198    fn checkpoint(
5199        &mut self,
5200        base: &StoragePath,
5201        files: &mut Vec<Arc<dyn FileCommitter>>,
5202    ) -> Result<(), DbspError> {
5203        self.operator
5204            .checkpoint(base, self.persistent_id().as_deref(), files)
5205    }
5206
5207    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5208        self.operator.restore(base, self.persistent_id().as_deref())
5209    }
5210
5211    fn clear_state(&mut self) -> Result<(), DbspError> {
5212        self.operator.clear_state()
5213    }
5214
5215    fn start_replay(&mut self) -> Result<(), DbspError> {
5216        self.operator.start_replay()
5217    }
5218
5219    fn is_replay_complete(&self) -> bool {
5220        self.operator.is_replay_complete()
5221    }
5222
5223    fn end_replay(&mut self) -> Result<(), DbspError> {
5224        self.operator.end_replay()
5225    }
5226
5227    fn set_label(&mut self, key: &str, value: &str) {
5228        self.labels.insert(key.to_string(), value.to_string());
5229    }
5230
5231    fn get_label(&self, key: &str) -> Option<&str> {
5232        self.labels.get(key).map(|s| s.as_str())
5233    }
5234
5235    fn labels(&self) -> &BTreeMap<String, String> {
5236        &self.labels
5237    }
5238
5239    fn as_any(&self) -> &dyn Any {
5240        self
5241    }
5242}
5243
5244struct TernarySinkNode<C, I1, I2, I3, Op> {
5245    id: GlobalNodeId,
5246    operator: Op,
5247    input_stream1: Stream<C, I1>,
5248    input_stream2: Stream<C, I2>,
5249    input_stream3: Stream<C, I3>,
5250    labels: BTreeMap<String, String>,
5251}
5252
5253impl<C, I1, I2, I3, Op> TernarySinkNode<C, I1, I2, I3, Op>
5254where
5255    I1: Clone,
5256    I2: Clone,
5257    I3: Clone,
5258    Op: TernarySinkOperator<I1, I2, I3>,
5259    C: Circuit,
5260{
5261    fn new(
5262        operator: Op,
5263        input_stream1: Stream<C, I1>,
5264        input_stream2: Stream<C, I2>,
5265        input_stream3: Stream<C, I3>,
5266        circuit: C,
5267        id: NodeId,
5268    ) -> Self {
5269        assert!(!input_stream1.ptr_eq(&input_stream2));
5270        assert!(!input_stream1.ptr_eq(&input_stream3));
5271        assert!(!input_stream2.ptr_eq(&input_stream3));
5272
5273        Self {
5274            id: circuit.global_node_id().child(id),
5275            operator,
5276            input_stream1,
5277            input_stream2,
5278            input_stream3,
5279            labels: BTreeMap::new(),
5280        }
5281    }
5282}
5283
5284impl<C, I1, I2, I3, Op> Node for TernarySinkNode<C, I1, I2, I3, Op>
5285where
5286    C: Circuit,
5287    I1: Clone + 'static,
5288    I2: Clone + 'static,
5289    I3: Clone + 'static,
5290    Op: TernarySinkOperator<I1, I2, I3>,
5291{
5292    fn name(&self) -> Cow<'static, str> {
5293        self.operator.name()
5294    }
5295
5296    fn local_id(&self) -> NodeId {
5297        self.id.local_node_id().unwrap()
5298    }
5299
5300    fn global_id(&self) -> &GlobalNodeId {
5301        &self.id
5302    }
5303
5304    fn is_async(&self) -> bool {
5305        self.operator.is_async()
5306    }
5307
5308    fn is_input(&self) -> bool {
5309        self.operator.is_input()
5310    }
5311
5312    fn ready(&self) -> bool {
5313        self.operator.ready()
5314    }
5315
5316    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5317        self.operator.register_ready_callback(cb);
5318    }
5319
5320    // Justification: see StreamValue::take() comment.
5321    #[allow(clippy::await_holding_refcell_ref)]
5322    fn eval<'a>(
5323        &'a mut self,
5324    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5325        Box::pin(async {
5326            let val1 = StreamValue::take(self.input_stream1.val()).map(|val| Cow::Owned(val));
5327            let r1 = self.input_stream1.get();
5328            let val2 = StreamValue::take(self.input_stream2.val()).map(|val| Cow::Owned(val));
5329            let r2 = self.input_stream2.get();
5330            let val3 = StreamValue::take(self.input_stream3.val()).map(|val| Cow::Owned(val));
5331            let r3 = self.input_stream3.get();
5332
5333            self.operator
5334                .eval(
5335                    val1.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r1))),
5336                    val2.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r2))),
5337                    val3.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r3))),
5338                )
5339                .await;
5340
5341            drop(r1);
5342            drop(r2);
5343            drop(r3);
5344
5345            StreamValue::consume_token(self.input_stream1.val());
5346            StreamValue::consume_token(self.input_stream2.val());
5347            StreamValue::consume_token(self.input_stream3.val());
5348
5349            Ok(self.operator.flush_progress())
5350        })
5351    }
5352
5353    fn start_transaction(&mut self) {
5354        self.operator.start_transaction();
5355    }
5356
5357    fn flush(&mut self) {
5358        self.operator.flush();
5359    }
5360
5361    fn is_flush_complete(&self) -> bool {
5362        self.operator.is_flush_complete()
5363    }
5364
5365    fn clock_start(&mut self, scope: Scope) {
5366        self.operator.clock_start(scope);
5367    }
5368
5369    fn clock_end(&mut self, scope: Scope) {
5370        self.operator.clock_end(scope);
5371    }
5372
5373    fn init(&mut self) {
5374        self.operator.init(&self.id);
5375    }
5376
5377    fn metadata(&self, output: &mut OperatorMeta) {
5378        self.operator.metadata(output);
5379    }
5380
5381    fn fixedpoint(&self, scope: Scope) -> bool {
5382        self.operator.fixedpoint(scope)
5383    }
5384
5385    fn checkpoint(
5386        &mut self,
5387        base: &StoragePath,
5388        files: &mut Vec<Arc<dyn FileCommitter>>,
5389    ) -> Result<(), DbspError> {
5390        self.operator
5391            .checkpoint(base, self.persistent_id().as_deref(), files)
5392    }
5393
5394    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5395        self.operator.restore(base, self.persistent_id().as_deref())
5396    }
5397
5398    fn clear_state(&mut self) -> Result<(), DbspError> {
5399        self.operator.clear_state()
5400    }
5401
5402    fn start_replay(&mut self) -> Result<(), DbspError> {
5403        self.operator.start_replay()
5404    }
5405
5406    fn is_replay_complete(&self) -> bool {
5407        self.operator.is_replay_complete()
5408    }
5409
5410    fn end_replay(&mut self) -> Result<(), DbspError> {
5411        self.operator.end_replay()
5412    }
5413
5414    fn set_label(&mut self, key: &str, value: &str) {
5415        self.labels.insert(key.to_string(), value.to_string());
5416    }
5417
5418    fn get_label(&self, key: &str) -> Option<&str> {
5419        self.labels.get(key).map(|s| s.as_str())
5420    }
5421
5422    fn labels(&self) -> &BTreeMap<String, String> {
5423        &self.labels
5424    }
5425
5426    fn as_any(&self) -> &dyn Any {
5427        self
5428    }
5429}
5430
5431struct BinaryNode<C, I1, I2, O, Op> {
5432    id: GlobalNodeId,
5433    operator: Op,
5434    input_stream1: Stream<C, I1>,
5435    input_stream2: Stream<C, I2>,
5436    output_stream: Stream<C, O>,
5437    // `true` if both input streams are aliases of the same stream.
5438    is_alias: bool,
5439    labels: BTreeMap<String, String>,
5440}
5441
5442impl<C, I1, I2, O, Op> BinaryNode<C, I1, I2, O, Op>
5443where
5444    Op: BinaryOperator<I1, I2, O>,
5445    C: Circuit,
5446{
5447    fn new(
5448        operator: Op,
5449        input_stream1: Stream<C, I1>,
5450        input_stream2: Stream<C, I2>,
5451        circuit: C,
5452        id: NodeId,
5453    ) -> Self {
5454        let is_alias = input_stream1.ptr_eq(&input_stream2);
5455        Self {
5456            id: circuit.global_node_id().child(id),
5457            operator,
5458            input_stream1,
5459            input_stream2,
5460            is_alias,
5461            output_stream: Stream::new(circuit, id),
5462            labels: BTreeMap::new(),
5463        }
5464    }
5465
5466    fn output_stream(&self) -> Stream<C, O> {
5467        self.output_stream.clone()
5468    }
5469}
5470
5471impl<C, I1, I2, O, Op> Node for BinaryNode<C, I1, I2, O, Op>
5472where
5473    C: Circuit,
5474    I1: Clone + 'static,
5475    I2: Clone + 'static,
5476    O: Clone + 'static,
5477    Op: BinaryOperator<I1, I2, O>,
5478{
5479    fn name(&self) -> Cow<'static, str> {
5480        self.operator.name()
5481    }
5482
5483    fn local_id(&self) -> NodeId {
5484        self.id.local_node_id().unwrap()
5485    }
5486
5487    fn global_id(&self) -> &GlobalNodeId {
5488        &self.id
5489    }
5490
5491    fn is_async(&self) -> bool {
5492        self.operator.is_async()
5493    }
5494
5495    fn is_input(&self) -> bool {
5496        self.operator.is_input()
5497    }
5498
5499    fn ready(&self) -> bool {
5500        self.operator.ready()
5501    }
5502
5503    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5504        self.operator.register_ready_callback(cb);
5505    }
5506
5507    // Justification: see StreamValue::take() comment.
5508    #[allow(clippy::await_holding_refcell_ref)]
5509    fn eval<'a>(
5510        &'a mut self,
5511    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5512        Box::pin(async {
5513            // If the two input streams are aliases, we cannot remove the owned
5514            // value from `input_stream2`, as this will invalidate the borrow
5515            // from `input_stream1`.  Instead use `peek` to obtain the value by
5516            // reference.
5517            if self.is_alias {
5518                {
5519                    let val1 = self.input_stream1.get();
5520                    let val2 = self.input_stream2.get();
5521
5522                    self.output_stream.put(
5523                        self.operator
5524                            .eval(StreamValue::peek(&val1), StreamValue::peek(&val2))
5525                            .await,
5526                    );
5527                }
5528                // It is now safe to call `take`, and we must do so to decrement
5529                // the ref counter.
5530                StreamValue::consume_token(self.input_stream1.val());
5531                StreamValue::consume_token(self.input_stream2.val());
5532            } else {
5533                let val1 = StreamValue::take(self.input_stream1.val());
5534                let val2 = StreamValue::take(self.input_stream2.val());
5535
5536                self.output_stream.put(match (val1, val2) {
5537                    (Some(val1), Some(val2)) => self.operator.eval_owned(val1, val2).await,
5538                    (Some(val1), None) => {
5539                        self.operator
5540                            .eval_owned_and_ref(val1, StreamValue::peek(&self.input_stream2.get()))
5541                            .await
5542                    }
5543                    (None, Some(val2)) => {
5544                        self.operator
5545                            .eval_ref_and_owned(StreamValue::peek(&self.input_stream1.get()), val2)
5546                            .await
5547                    }
5548                    (None, None) => {
5549                        self.operator
5550                            .eval(
5551                                StreamValue::peek(&self.input_stream1.get()),
5552                                StreamValue::peek(&self.input_stream2.get()),
5553                            )
5554                            .await
5555                    }
5556                });
5557                StreamValue::consume_token(self.input_stream1.val());
5558                StreamValue::consume_token(self.input_stream2.val());
5559            }
5560            Ok(self.operator.flush_progress())
5561        })
5562    }
5563
5564    fn start_transaction(&mut self) {
5565        self.operator.start_transaction();
5566    }
5567
5568    fn flush(&mut self) {
5569        self.operator.flush();
5570    }
5571
5572    fn is_flush_complete(&self) -> bool {
5573        self.operator.is_flush_complete()
5574    }
5575
5576    fn clock_start(&mut self, scope: Scope) {
5577        self.operator.clock_start(scope);
5578    }
5579
5580    fn clock_end(&mut self, scope: Scope) {
5581        self.operator.clock_end(scope);
5582    }
5583
5584    fn init(&mut self) {
5585        self.operator.init(&self.id);
5586    }
5587
5588    fn metadata(&self, output: &mut OperatorMeta) {
5589        self.operator.metadata(output);
5590    }
5591
5592    fn fixedpoint(&self, scope: Scope) -> bool {
5593        self.operator.fixedpoint(scope)
5594    }
5595
5596    fn checkpoint(
5597        &mut self,
5598        base: &StoragePath,
5599        files: &mut Vec<Arc<dyn FileCommitter>>,
5600    ) -> Result<(), DbspError> {
5601        self.operator
5602            .checkpoint(base, self.persistent_id().as_deref(), files)
5603    }
5604
5605    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5606        self.operator.restore(base, self.persistent_id().as_deref())
5607    }
5608
5609    fn clear_state(&mut self) -> Result<(), DbspError> {
5610        self.operator.clear_state()
5611    }
5612
5613    fn start_replay(&mut self) -> Result<(), DbspError> {
5614        self.operator.start_replay()
5615    }
5616
5617    fn is_replay_complete(&self) -> bool {
5618        self.operator.is_replay_complete()
5619    }
5620
5621    fn end_replay(&mut self) -> Result<(), DbspError> {
5622        self.operator.end_replay()
5623    }
5624
5625    fn set_label(&mut self, key: &str, value: &str) {
5626        self.labels.insert(key.to_string(), value.to_string());
5627    }
5628
5629    fn get_label(&self, key: &str) -> Option<&str> {
5630        self.labels.get(key).map(|s| s.as_str())
5631    }
5632
5633    fn labels(&self) -> &BTreeMap<String, String> {
5634        &self.labels
5635    }
5636
5637    fn as_any(&self) -> &dyn Any {
5638        self
5639    }
5640}
5641
5642struct TernaryNode<C, I1, I2, I3, O, Op> {
5643    id: GlobalNodeId,
5644    operator: Op,
5645    input_stream1: Stream<C, I1>,
5646    input_stream2: Stream<C, I2>,
5647    input_stream3: Stream<C, I3>,
5648    output_stream: Stream<C, O>,
5649    labels: BTreeMap<String, String>,
5650}
5651
5652impl<C, I1, I2, I3, O, Op> TernaryNode<C, I1, I2, I3, O, Op>
5653where
5654    I1: Clone,
5655    I2: Clone,
5656    I3: Clone,
5657    Op: TernaryOperator<I1, I2, I3, O>,
5658    C: Circuit,
5659{
5660    fn new(
5661        operator: Op,
5662        input_stream1: Stream<C, I1>,
5663        input_stream2: Stream<C, I2>,
5664        input_stream3: Stream<C, I3>,
5665        circuit: C,
5666        id: NodeId,
5667    ) -> Self {
5668        Self {
5669            id: circuit.global_node_id().child(id),
5670            operator,
5671            input_stream1,
5672            input_stream2,
5673            input_stream3,
5674            // is_alias1,
5675            // is_alias2,
5676            output_stream: Stream::new(circuit, id),
5677            labels: BTreeMap::new(),
5678        }
5679    }
5680
5681    fn output_stream(&self) -> Stream<C, O> {
5682        self.output_stream.clone()
5683    }
5684}
5685
5686impl<C, I1, I2, I3, O, Op> Node for TernaryNode<C, I1, I2, I3, O, Op>
5687where
5688    C: Circuit,
5689    I1: Clone + 'static,
5690    I2: Clone + 'static,
5691    I3: Clone + 'static,
5692    O: Clone + 'static,
5693    Op: TernaryOperator<I1, I2, I3, O>,
5694{
5695    fn name(&self) -> Cow<'static, str> {
5696        self.operator.name()
5697    }
5698
5699    fn local_id(&self) -> NodeId {
5700        self.id.local_node_id().unwrap()
5701    }
5702
5703    fn global_id(&self) -> &GlobalNodeId {
5704        &self.id
5705    }
5706
5707    fn is_async(&self) -> bool {
5708        self.operator.is_async()
5709    }
5710
5711    fn is_input(&self) -> bool {
5712        self.operator.is_input()
5713    }
5714
5715    fn ready(&self) -> bool {
5716        self.operator.ready()
5717    }
5718
5719    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5720        self.operator.register_ready_callback(cb);
5721    }
5722
5723    // Justification: see StreamValue::take() comment.
5724    #[allow(clippy::await_holding_refcell_ref)]
5725    fn eval<'a>(
5726        &'a mut self,
5727    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5728        Box::pin(async {
5729            {
5730                self.output_stream.put(
5731                    self.operator
5732                        .eval(
5733                            Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5734                            Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5735                            Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
5736                        )
5737                        .await,
5738                );
5739            }
5740
5741            StreamValue::consume_token(self.input_stream1.val());
5742            StreamValue::consume_token(self.input_stream2.val());
5743            StreamValue::consume_token(self.input_stream3.val());
5744
5745            Ok(self.operator.flush_progress())
5746        })
5747    }
5748
5749    fn start_transaction(&mut self) {
5750        self.operator.start_transaction();
5751    }
5752
5753    fn flush(&mut self) {
5754        self.operator.flush();
5755    }
5756
5757    fn is_flush_complete(&self) -> bool {
5758        self.operator.is_flush_complete()
5759    }
5760
5761    fn clock_start(&mut self, scope: Scope) {
5762        self.operator.clock_start(scope);
5763    }
5764
5765    fn clock_end(&mut self, scope: Scope) {
5766        self.operator.clock_end(scope);
5767    }
5768
5769    fn init(&mut self) {
5770        self.operator.init(&self.id);
5771    }
5772
5773    fn metadata(&self, output: &mut OperatorMeta) {
5774        self.operator.metadata(output);
5775    }
5776
5777    fn fixedpoint(&self, scope: Scope) -> bool {
5778        self.operator.fixedpoint(scope)
5779    }
5780
5781    fn checkpoint(
5782        &mut self,
5783        base: &StoragePath,
5784        files: &mut Vec<Arc<dyn FileCommitter>>,
5785    ) -> Result<(), DbspError> {
5786        self.operator
5787            .checkpoint(base, self.persistent_id().as_deref(), files)
5788    }
5789
5790    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5791        self.operator.restore(base, self.persistent_id().as_deref())
5792    }
5793
5794    fn clear_state(&mut self) -> Result<(), DbspError> {
5795        self.operator.clear_state()
5796    }
5797
5798    fn start_replay(&mut self) -> Result<(), DbspError> {
5799        self.operator.start_replay()
5800    }
5801
5802    fn is_replay_complete(&self) -> bool {
5803        self.operator.is_replay_complete()
5804    }
5805
5806    fn end_replay(&mut self) -> Result<(), DbspError> {
5807        self.operator.end_replay()
5808    }
5809
5810    fn set_label(&mut self, key: &str, value: &str) {
5811        self.labels.insert(key.to_string(), value.to_string());
5812    }
5813
5814    fn get_label(&self, key: &str) -> Option<&str> {
5815        self.labels.get(key).map(|s| s.as_str())
5816    }
5817
5818    fn labels(&self) -> &BTreeMap<String, String> {
5819        &self.labels
5820    }
5821
5822    fn as_any(&self) -> &dyn Any {
5823        self
5824    }
5825}
5826
5827struct QuaternaryNode<C, I1, I2, I3, I4, O, Op> {
5828    id: GlobalNodeId,
5829    operator: Op,
5830    input_stream1: Stream<C, I1>,
5831    input_stream2: Stream<C, I2>,
5832    input_stream3: Stream<C, I3>,
5833    input_stream4: Stream<C, I4>,
5834    output_stream: Stream<C, O>,
5835    labels: BTreeMap<String, String>,
5836    // // `true` if `input_stream1` is an alias to `input_stream2`, `input_stream3` or
5837    // // `input_stream4`.
5838    // is_alias1: bool,
5839    // // `true` if `input_stream2` is an alias to `input_stream3` or `input_stream4`.
5840    // is_alias2: bool,
5841    // // `true` if `input_stream3` is an alias to `input_stream4`.
5842    // is_alias3: bool,
5843}
5844
5845impl<C, I1, I2, I3, I4, O, Op> QuaternaryNode<C, I1, I2, I3, I4, O, Op>
5846where
5847    I1: Clone,
5848    I2: Clone,
5849    I3: Clone,
5850    I4: Clone,
5851    Op: QuaternaryOperator<I1, I2, I3, I4, O>,
5852    C: Circuit,
5853{
5854    fn new(
5855        operator: Op,
5856        input_stream1: Stream<C, I1>,
5857        input_stream2: Stream<C, I2>,
5858        input_stream3: Stream<C, I3>,
5859        input_stream4: Stream<C, I4>,
5860        circuit: C,
5861        id: NodeId,
5862    ) -> Self {
5863        // let is_alias1 = input_stream1.ptr_eq(&input_stream2)
5864        //     || input_stream1.ptr_eq(&input_stream3)
5865        //     || input_stream1.ptr_eq(&input_stream4);
5866        // let is_alias2 =
5867        //     input_stream2.ptr_eq(&input_stream3) || input_stream2.ptr_eq(&input_stream4);
5868        // let is_alias3 = input_stream3.ptr_eq(&input_stream4);
5869        Self {
5870            id: circuit.global_node_id().child(id),
5871            operator,
5872            input_stream1,
5873            input_stream2,
5874            input_stream3,
5875            input_stream4,
5876            // is_alias1,
5877            // is_alias2,
5878            // is_alias3,
5879            output_stream: Stream::new(circuit, id),
5880            labels: BTreeMap::new(),
5881        }
5882    }
5883
5884    fn output_stream(&self) -> Stream<C, O> {
5885        self.output_stream.clone()
5886    }
5887}
5888
5889impl<C, I1, I2, I3, I4, O, Op> Node for QuaternaryNode<C, I1, I2, I3, I4, O, Op>
5890where
5891    C: Circuit,
5892    I1: Clone + 'static,
5893    I2: Clone + 'static,
5894    I3: Clone + 'static,
5895    I4: Clone + 'static,
5896    O: Clone + 'static,
5897    Op: QuaternaryOperator<I1, I2, I3, I4, O>,
5898{
5899    fn name(&self) -> Cow<'static, str> {
5900        self.operator.name()
5901    }
5902
5903    fn local_id(&self) -> NodeId {
5904        self.id.local_node_id().unwrap()
5905    }
5906
5907    fn global_id(&self) -> &GlobalNodeId {
5908        &self.id
5909    }
5910
5911    fn is_async(&self) -> bool {
5912        self.operator.is_async()
5913    }
5914
5915    fn is_input(&self) -> bool {
5916        self.operator.is_input()
5917    }
5918
5919    fn ready(&self) -> bool {
5920        self.operator.ready()
5921    }
5922
5923    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5924        self.operator.register_ready_callback(cb);
5925    }
5926
5927    // Justification: see StreamValue::take() comment.
5928    #[allow(clippy::await_holding_refcell_ref)]
5929    fn eval<'a>(
5930        &'a mut self,
5931    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5932        Box::pin(async {
5933            {
5934                self.output_stream.put(
5935                    self.operator
5936                        .eval(
5937                            Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5938                            Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5939                            Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
5940                            Cow::Borrowed(StreamValue::peek(&self.input_stream4.get())),
5941                        )
5942                        .await,
5943                );
5944            }
5945
5946            StreamValue::consume_token(self.input_stream1.val());
5947            StreamValue::consume_token(self.input_stream2.val());
5948            StreamValue::consume_token(self.input_stream3.val());
5949            StreamValue::consume_token(self.input_stream4.val());
5950
5951            Ok(self.operator.flush_progress())
5952        })
5953    }
5954
5955    fn start_transaction(&mut self) {
5956        self.operator.start_transaction();
5957    }
5958
5959    fn flush(&mut self) {
5960        self.operator.flush();
5961    }
5962
5963    fn is_flush_complete(&self) -> bool {
5964        self.operator.is_flush_complete()
5965    }
5966
5967    fn clock_start(&mut self, scope: Scope) {
5968        self.operator.clock_start(scope);
5969    }
5970
5971    fn clock_end(&mut self, scope: Scope) {
5972        self.operator.clock_end(scope);
5973    }
5974
5975    fn init(&mut self) {
5976        self.operator.init(&self.id);
5977    }
5978
5979    fn metadata(&self, output: &mut OperatorMeta) {
5980        self.operator.metadata(output);
5981    }
5982
5983    fn fixedpoint(&self, scope: Scope) -> bool {
5984        self.operator.fixedpoint(scope)
5985    }
5986
5987    fn checkpoint(
5988        &mut self,
5989        base: &StoragePath,
5990        files: &mut Vec<Arc<dyn FileCommitter>>,
5991    ) -> Result<(), DbspError> {
5992        self.operator
5993            .checkpoint(base, self.persistent_id().as_deref(), files)
5994    }
5995
5996    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5997        self.operator.restore(base, self.persistent_id().as_deref())
5998    }
5999
6000    fn clear_state(&mut self) -> Result<(), DbspError> {
6001        self.operator.clear_state()
6002    }
6003
6004    fn start_replay(&mut self) -> Result<(), DbspError> {
6005        self.operator.start_replay()
6006    }
6007
6008    fn is_replay_complete(&self) -> bool {
6009        self.operator.is_replay_complete()
6010    }
6011
6012    fn end_replay(&mut self) -> Result<(), DbspError> {
6013        self.operator.end_replay()
6014    }
6015
6016    fn set_label(&mut self, key: &str, value: &str) {
6017        self.labels.insert(key.to_string(), value.to_string());
6018    }
6019
6020    fn get_label(&self, key: &str) -> Option<&str> {
6021        self.labels.get(key).map(|s| s.as_str())
6022    }
6023
6024    fn labels(&self) -> &BTreeMap<String, String> {
6025        &self.labels
6026    }
6027
6028    fn as_any(&self) -> &dyn Any {
6029        self
6030    }
6031}
6032
6033struct NaryNode<C, I, O, Op>
6034where
6035    I: Clone + 'static,
6036{
6037    id: GlobalNodeId,
6038    operator: Op,
6039    // The second field of the tuple indicates if the stream is an
6040    // alias to an earlier stream.
6041    input_streams: Vec<Stream<C, I>>,
6042    // // Streams that are aliases.
6043    // aliases: Vec<usize>,
6044    output_stream: Stream<C, O>,
6045    labels: BTreeMap<String, String>,
6046}
6047
6048impl<C, I, O, Op> NaryNode<C, I, O, Op>
6049where
6050    I: Clone + 'static,
6051    Op: NaryOperator<I, O>,
6052    C: Circuit,
6053{
6054    fn new<Iter>(operator: Op, input_streams: Iter, circuit: C, id: NodeId) -> Self
6055    where
6056        Iter: IntoIterator<Item = Stream<C, I>>,
6057    {
6058        let mut input_streams: Vec<_> = input_streams.into_iter().collect();
6059        // let mut aliases = Vec::new();
6060        // for i in 0..input_streams.len() {
6061        //     for j in 0..i {
6062        //         if input_streams[i].0.ptr_eq(&input_streams[j].0) {
6063        //             input_streams[i].1 = true;
6064        //             aliases.push(i);
6065        //             break;
6066        //         }
6067        //     }
6068        // }
6069        //aliases.shrink_to_fit();
6070        input_streams.shrink_to_fit();
6071        Self {
6072            id: circuit.global_node_id().child(id),
6073            operator,
6074            input_streams,
6075            //aliases,
6076            output_stream: Stream::new(circuit, id),
6077            labels: BTreeMap::new(),
6078        }
6079    }
6080
6081    fn output_stream(&self) -> Stream<C, O> {
6082        self.output_stream.clone()
6083    }
6084}
6085
6086impl<C, I, O, Op> Node for NaryNode<C, I, O, Op>
6087where
6088    C: Circuit,
6089    I: Clone,
6090    O: Clone + 'static,
6091    Op: NaryOperator<I, O>,
6092{
6093    fn name(&self) -> Cow<'static, str> {
6094        self.operator.name()
6095    }
6096
6097    fn local_id(&self) -> NodeId {
6098        self.id.local_node_id().unwrap()
6099    }
6100
6101    fn global_id(&self) -> &GlobalNodeId {
6102        &self.id
6103    }
6104
6105    fn is_async(&self) -> bool {
6106        self.operator.is_async()
6107    }
6108
6109    fn is_input(&self) -> bool {
6110        self.operator.is_input()
6111    }
6112
6113    fn ready(&self) -> bool {
6114        self.operator.ready()
6115    }
6116
6117    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6118        self.operator.register_ready_callback(cb);
6119    }
6120
6121    fn eval<'a>(
6122        &'a mut self,
6123    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6124        Box::pin(async {
6125            let refs = self
6126                .input_streams
6127                .iter()
6128                .map(|stream| stream.get())
6129                .collect::<Vec<_>>();
6130
6131            self.output_stream.put(
6132                self.operator
6133                    .eval(refs.iter().map(|r| Cow::Borrowed(StreamValue::peek(r))))
6134                    .await,
6135            );
6136
6137            std::mem::drop(refs);
6138
6139            for i in self.input_streams.iter() {
6140                StreamValue::consume_token(i.val());
6141            }
6142            Ok(self.operator.flush_progress())
6143        })
6144    }
6145
6146    fn start_transaction(&mut self) {
6147        self.operator.start_transaction();
6148    }
6149
6150    fn flush(&mut self) {
6151        self.operator.flush();
6152    }
6153
6154    fn is_flush_complete(&self) -> bool {
6155        self.operator.is_flush_complete()
6156    }
6157
6158    fn clock_start(&mut self, scope: Scope) {
6159        self.operator.clock_start(scope);
6160    }
6161
6162    fn clock_end(&mut self, scope: Scope) {
6163        self.operator.clock_end(scope);
6164    }
6165
6166    fn init(&mut self) {
6167        self.operator.init(&self.id);
6168    }
6169
6170    fn metadata(&self, output: &mut OperatorMeta) {
6171        self.operator.metadata(output);
6172    }
6173
6174    fn fixedpoint(&self, scope: Scope) -> bool {
6175        self.operator.fixedpoint(scope)
6176    }
6177
6178    fn checkpoint(
6179        &mut self,
6180        base: &StoragePath,
6181        files: &mut Vec<Arc<dyn FileCommitter>>,
6182    ) -> Result<(), DbspError> {
6183        self.operator
6184            .checkpoint(base, self.persistent_id().as_deref(), files)
6185    }
6186
6187    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6188        self.operator.restore(base, self.persistent_id().as_deref())
6189    }
6190
6191    fn clear_state(&mut self) -> Result<(), DbspError> {
6192        self.operator.clear_state()
6193    }
6194
6195    fn start_replay(&mut self) -> Result<(), DbspError> {
6196        self.operator.start_replay()
6197    }
6198
6199    fn is_replay_complete(&self) -> bool {
6200        self.operator.is_replay_complete()
6201    }
6202
6203    fn end_replay(&mut self) -> Result<(), DbspError> {
6204        self.operator.end_replay()
6205    }
6206
6207    fn set_label(&mut self, key: &str, value: &str) {
6208        self.labels.insert(key.to_string(), value.to_string());
6209    }
6210
6211    fn get_label(&self, key: &str) -> Option<&str> {
6212        self.labels.get(key).map(|s| s.as_str())
6213    }
6214
6215    fn labels(&self) -> &BTreeMap<String, String> {
6216        &self.labels
6217    }
6218
6219    fn as_any(&self) -> &dyn Any {
6220        self
6221    }
6222}
6223
6224// The output half of a feedback node.  We implement a feedback node using a
6225// pair of nodes: `FeedbackOutputNode` is connected to the circuit as a source
6226// node (i.e., it does not have an input stream) and thus gets evaluated first
6227// in each time stamp.  `FeedbackInputNode` is a sink node.  This way the
6228// circuit graph remains acyclic and can be scheduled in a topological order.
6229struct FeedbackOutputNode<C, I, O, Op>
6230where
6231    C: Circuit,
6232{
6233    id: GlobalNodeId,
6234    operator: Rc<RefCell<Op>>,
6235    output_stream: Stream<C, O>,
6236    export_stream: Option<Stream<C::Parent, O>>,
6237    phantom_input: PhantomData<I>,
6238    labels: BTreeMap<String, String>,
6239}
6240
6241impl<C, I, O, Op> FeedbackOutputNode<C, I, O, Op>
6242where
6243    C: Circuit,
6244    Op: StrictUnaryOperator<I, O>,
6245{
6246    fn new(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6247        Self {
6248            id: circuit.global_node_id().child(id),
6249            operator,
6250            output_stream: Stream::new(circuit.clone(), id),
6251            export_stream: None,
6252            phantom_input: PhantomData,
6253            labels: BTreeMap::new(),
6254        }
6255    }
6256
6257    fn with_export(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6258        let mut result = Self::new(operator, circuit.clone(), id);
6259        result.export_stream = Some(Stream::with_origin(
6260            circuit.parent(),
6261            circuit.allocate_stream_id(),
6262            circuit.node_id(),
6263            GlobalNodeId::child_of(&circuit, id),
6264        ));
6265        result
6266    }
6267
6268    fn output_stream(&self) -> Stream<C, O> {
6269        self.output_stream.clone()
6270    }
6271}
6272
6273impl<C, I, O, Op> Node for FeedbackOutputNode<C, I, O, Op>
6274where
6275    C: Circuit,
6276    I: Data,
6277    O: Clone + 'static,
6278    Op: StrictUnaryOperator<I, O>,
6279{
6280    fn local_id(&self) -> NodeId {
6281        self.id.local_node_id().unwrap()
6282    }
6283
6284    fn global_id(&self) -> &GlobalNodeId {
6285        &self.id
6286    }
6287
6288    fn name(&self) -> Cow<'static, str> {
6289        self.operator.borrow().name()
6290    }
6291
6292    fn is_async(&self) -> bool {
6293        self.operator.borrow().is_async()
6294    }
6295
6296    fn is_input(&self) -> bool {
6297        self.operator.borrow().is_input()
6298    }
6299
6300    fn ready(&self) -> bool {
6301        self.operator.borrow().ready()
6302    }
6303
6304    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6305        self.operator.borrow_mut().register_ready_callback(cb);
6306    }
6307
6308    fn eval<'a>(
6309        &'a mut self,
6310    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6311        Box::pin(async {
6312            self.output_stream
6313                .put(self.operator.borrow_mut().get_output());
6314            Ok(None)
6315        })
6316    }
6317
6318    fn start_transaction(&mut self) {
6319        self.operator.borrow_mut().start_transaction();
6320    }
6321
6322    fn flush(&mut self) {
6323        self.operator.borrow_mut().flush();
6324    }
6325
6326    fn is_flush_complete(&self) -> bool {
6327        self.operator.borrow().is_flush_complete()
6328    }
6329
6330    fn clock_start(&mut self, scope: Scope) {
6331        self.operator.borrow_mut().clock_start(scope)
6332    }
6333
6334    fn clock_end(&mut self, scope: Scope) {
6335        if scope == 0
6336            && let Some(export_stream) = &mut self.export_stream
6337        {
6338            export_stream.put(self.operator.borrow_mut().get_final_output());
6339        }
6340        self.operator.borrow_mut().clock_end(scope);
6341    }
6342
6343    fn init(&mut self) {
6344        self.operator.borrow_mut().init(&self.id);
6345    }
6346
6347    fn metadata(&self, _output: &mut OperatorMeta) {
6348        // Avoid producing duplicate metadata for input and output parts of the operator;
6349        // otherwise it will be double-counted in circuit-level metrics.
6350    }
6351
6352    fn fixedpoint(&self, scope: Scope) -> bool {
6353        self.operator.borrow().fixedpoint(scope)
6354    }
6355
6356    fn checkpoint(
6357        &mut self,
6358        base: &StoragePath,
6359        files: &mut Vec<Arc<dyn FileCommitter>>,
6360    ) -> Result<(), DbspError> {
6361        self.operator
6362            .borrow_mut()
6363            .checkpoint(base, self.persistent_id().as_deref(), files)
6364    }
6365
6366    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6367        self.operator
6368            .borrow_mut()
6369            .restore(base, self.persistent_id().as_deref())
6370    }
6371
6372    fn clear_state(&mut self) -> Result<(), DbspError> {
6373        self.operator.borrow_mut().clear_state()
6374    }
6375
6376    fn start_replay(&mut self) -> Result<(), DbspError> {
6377        self.operator.borrow_mut().start_replay()
6378    }
6379
6380    fn is_replay_complete(&self) -> bool {
6381        self.operator.borrow().is_replay_complete()
6382    }
6383
6384    fn end_replay(&mut self) -> Result<(), DbspError> {
6385        self.operator.borrow_mut().end_replay()
6386    }
6387
6388    fn set_label(&mut self, key: &str, value: &str) {
6389        self.labels.insert(key.to_string(), value.to_string());
6390    }
6391
6392    fn get_label(&self, key: &str) -> Option<&str> {
6393        self.labels.get(key).map(|s| s.as_str())
6394    }
6395
6396    fn labels(&self) -> &BTreeMap<String, String> {
6397        &self.labels
6398    }
6399
6400    fn as_any(&self) -> &dyn Any {
6401        self
6402    }
6403}
6404
6405/// The input half of a feedback node
6406struct FeedbackInputNode<C, I, O, Op> {
6407    // Id of this node (the input half).
6408    id: GlobalNodeId,
6409    operator: Rc<RefCell<Op>>,
6410    input_stream: Stream<C, I>,
6411    phantom_output: PhantomData<O>,
6412    labels: BTreeMap<String, String>,
6413}
6414
6415impl<C, I, O, Op> FeedbackInputNode<C, I, O, Op>
6416where
6417    Op: StrictUnaryOperator<I, O>,
6418    C: Circuit,
6419{
6420    fn new(operator: Rc<RefCell<Op>>, input_stream: Stream<C, I>, id: NodeId) -> Self {
6421        Self {
6422            id: input_stream.circuit().global_node_id().child(id),
6423            operator,
6424            input_stream,
6425            phantom_output: PhantomData,
6426            labels: BTreeMap::new(),
6427        }
6428    }
6429}
6430
6431impl<C, I, O, Op> Node for FeedbackInputNode<C, I, O, Op>
6432where
6433    Op: StrictUnaryOperator<I, O>,
6434    I: Data,
6435    O: 'static,
6436    C: Clone + 'static,
6437{
6438    fn name(&self) -> Cow<'static, str> {
6439        self.operator.borrow().name()
6440    }
6441
6442    fn local_id(&self) -> NodeId {
6443        self.id.local_node_id().unwrap()
6444    }
6445
6446    fn global_id(&self) -> &GlobalNodeId {
6447        &self.id
6448    }
6449
6450    fn is_async(&self) -> bool {
6451        self.operator.borrow().is_async()
6452    }
6453
6454    fn is_input(&self) -> bool {
6455        self.operator.borrow().is_input()
6456    }
6457
6458    fn ready(&self) -> bool {
6459        self.operator.borrow().ready()
6460    }
6461
6462    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6463        self.operator.borrow_mut().register_ready_callback(cb);
6464    }
6465
6466    // Justification: see StreamValue::take() comment.
6467    #[allow(clippy::await_holding_refcell_ref)]
6468    fn eval<'a>(
6469        &'a mut self,
6470    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6471        Box::pin(async {
6472            match StreamValue::take(self.input_stream.val()) {
6473                Some(v) => self.operator.borrow_mut().eval_strict_owned(v).await,
6474                None => {
6475                    self.operator
6476                        .borrow_mut()
6477                        .eval_strict(StreamValue::peek(&self.input_stream.get()))
6478                        .await
6479                }
6480            };
6481
6482            StreamValue::consume_token(self.input_stream.val());
6483
6484            Ok(None)
6485        })
6486    }
6487
6488    fn start_transaction(&mut self) {
6489        self.operator.borrow_mut().start_transaction();
6490    }
6491
6492    fn flush(&mut self) {
6493        self.operator.borrow_mut().flush();
6494    }
6495
6496    fn is_flush_complete(&self) -> bool {
6497        self.operator.borrow().is_flush_complete()
6498    }
6499
6500    // Don't call `clock_start`/`clock_end` on the operator.  `FeedbackOutputNode`
6501    // will do that.
6502    fn clock_start(&mut self, _scope: Scope) {}
6503
6504    fn clock_end(&mut self, _scope: Scope) {}
6505
6506    fn init(&mut self) {
6507        self.operator.borrow_mut().init(&self.id);
6508    }
6509
6510    fn metadata(&self, output: &mut OperatorMeta) {
6511        self.operator.borrow().metadata(output)
6512    }
6513
6514    fn fixedpoint(&self, scope: Scope) -> bool {
6515        self.operator.borrow().fixedpoint(scope)
6516    }
6517
6518    fn checkpoint(
6519        &mut self,
6520        _base: &StoragePath,
6521        _files: &mut Vec<Arc<dyn FileCommitter>>,
6522    ) -> Result<(), DbspError> {
6523        // The Z-1 operator consists of two logical parts.
6524        // The first part gets invoked at the start of a clock cycle to retrieve the
6525        // state stored at the previous clock tick. The second one gets invoked
6526        // to store the updated state inside the operator. We only want to
6527        // invoke commit on one of them, doesn't matter which (so we
6528        // do it in FeedbackOutputNode)
6529        Ok(())
6530    }
6531
6532    fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6533        // See comment in `commit`.
6534        Ok(())
6535    }
6536
6537    fn clear_state(&mut self) -> Result<(), DbspError> {
6538        Ok(())
6539    }
6540
6541    fn start_replay(&mut self) -> Result<(), DbspError> {
6542        self.operator.borrow_mut().start_replay()
6543    }
6544
6545    fn is_replay_complete(&self) -> bool {
6546        self.operator.borrow().is_replay_complete()
6547    }
6548
6549    fn end_replay(&mut self) -> Result<(), DbspError> {
6550        self.operator.borrow_mut().end_replay()
6551    }
6552
6553    fn set_label(&mut self, key: &str, value: &str) {
6554        self.labels.insert(key.to_string(), value.to_string());
6555    }
6556
6557    fn get_label(&self, key: &str) -> Option<&str> {
6558        self.labels.get(key).map(|s| s.as_str())
6559    }
6560
6561    fn labels(&self) -> &BTreeMap<String, String> {
6562        &self.labels
6563    }
6564
6565    fn as_any(&self) -> &dyn Any {
6566        self
6567    }
6568}
6569
6570/// Input connector of a feedback operator.
6571///
6572/// This struct is part of the mechanism for constructing a feedback loop in a
6573/// circuit. It is returned by [`Circuit::add_feedback`] and represents the
6574/// input port of an operator whose input stream does not exist yet.  Once the
6575/// input stream has been created, it can be connected to the operator using
6576/// [`FeedbackConnector::connect`]. See [`Circuit::add_feedback`] for details.
6577pub struct FeedbackConnector<C, I, O, Op> {
6578    output_node_id: NodeId,
6579    circuit: C,
6580    operator: Rc<RefCell<Op>>,
6581    phantom_input: PhantomData<I>,
6582    phantom_output: PhantomData<O>,
6583}
6584
6585impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6586where
6587    Op: StrictUnaryOperator<I, O>,
6588{
6589    fn new(output_node_id: NodeId, circuit: C, operator: Rc<RefCell<Op>>) -> Self {
6590        Self {
6591            output_node_id,
6592            circuit,
6593            operator,
6594            phantom_input: PhantomData,
6595            phantom_output: PhantomData,
6596        }
6597    }
6598}
6599
6600impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6601where
6602    Op: StrictUnaryOperator<I, O>,
6603    I: Data,
6604    O: Data,
6605    C: Circuit,
6606{
6607    pub fn operator_mut(&self) -> RefMut<'_, Op> {
6608        self.operator.borrow_mut()
6609    }
6610
6611    /// Connect `input_stream` as input to the operator.
6612    ///
6613    /// See [`Circuit::add_feedback`] for details.
6614    /// Returns node id of the input node.
6615    pub fn connect(self, input_stream: &Stream<C, I>) {
6616        self.connect_with_preference(input_stream, OwnershipPreference::INDIFFERENT)
6617    }
6618
6619    pub fn connect_with_preference(
6620        self,
6621        input_stream: &Stream<C, I>,
6622        input_preference: OwnershipPreference,
6623    ) {
6624        self.circuit.connect_feedback_with_preference(
6625            self.output_node_id,
6626            self.operator,
6627            input_stream,
6628            input_preference,
6629        )
6630    }
6631}
6632
6633// A nested circuit instantiated as a node in a parent circuit.
6634struct ChildNode<C>
6635where
6636    C: Circuit,
6637{
6638    id: GlobalNodeId,
6639    circuit: C,
6640    executor: Box<dyn Executor<C>>,
6641    labels: BTreeMap<String, String>,
6642    nesting_depth: Scope,
6643}
6644
6645impl<C> Drop for ChildNode<C>
6646where
6647    C: Circuit,
6648{
6649    fn drop(&mut self) {
6650        // Explicitly deallocate all nodes in the circuit to break
6651        // cyclic `Rc` references between circuits and streams.
6652        self.circuit.clear();
6653    }
6654}
6655
6656impl<C> ChildNode<C>
6657where
6658    C: Circuit,
6659{
6660    fn new<E>(circuit: C, nesting_depth: Scope, executor: E) -> Self
6661    where
6662        E: Executor<C>,
6663    {
6664        Self {
6665            id: circuit.global_node_id(),
6666            circuit,
6667            executor: Box::new(executor) as Box<dyn Executor<C>>,
6668            labels: BTreeMap::new(),
6669            nesting_depth,
6670        }
6671    }
6672}
6673
6674impl<C> Node for ChildNode<C>
6675where
6676    C: Circuit,
6677{
6678    fn name(&self) -> Cow<'static, str> {
6679        Cow::Borrowed("Subcircuit")
6680    }
6681
6682    fn local_id(&self) -> NodeId {
6683        self.id.local_node_id().unwrap()
6684    }
6685
6686    fn global_id(&self) -> &GlobalNodeId {
6687        &self.id
6688    }
6689
6690    fn is_circuit(&self) -> bool {
6691        true
6692    }
6693
6694    fn is_async(&self) -> bool {
6695        false
6696    }
6697
6698    fn is_input(&self) -> bool {
6699        false
6700    }
6701
6702    fn ready(&self) -> bool {
6703        true
6704    }
6705
6706    fn eval<'a>(
6707        &'a mut self,
6708    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6709        // We may want to make the executor responsible for evaluating import nodes
6710        // if there is a need for customizing this behavior.
6711        for node_id in self.circuit.import_nodes() {
6712            self.circuit.eval_import_node(node_id)
6713        }
6714        Box::pin(async {
6715            self.executor.transaction(&self.circuit).await?;
6716            Ok(None)
6717        })
6718    }
6719
6720    fn start_transaction(&mut self) {
6721        // Nested circuit has its own transactions.
6722    }
6723
6724    fn flush(&mut self) {
6725        self.executor.start_commit_transaction().unwrap();
6726    }
6727
6728    fn is_flush_complete(&self) -> bool {
6729        self.executor.is_commit_complete()
6730    }
6731
6732    fn clock_start(&mut self, scope: Scope) {
6733        self.circuit.clock_start(scope + self.nesting_depth);
6734    }
6735
6736    fn clock_end(&mut self, scope: Scope) {
6737        self.circuit.clock_end(scope + self.nesting_depth);
6738    }
6739
6740    fn metadata(&self, _meta: &mut OperatorMeta) {}
6741
6742    fn fixedpoint(&self, scope: Scope) -> bool {
6743        self.circuit.check_fixedpoint(scope + self.nesting_depth)
6744    }
6745
6746    fn map_nodes_recursive(
6747        &self,
6748        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
6749    ) -> Result<(), DbspError> {
6750        self.circuit.map_nodes_recursive(f)
6751    }
6752
6753    fn checkpoint(
6754        &mut self,
6755        _base: &StoragePath,
6756        _files: &mut Vec<Arc<dyn FileCommitter>>,
6757    ) -> Result<(), DbspError> {
6758        Ok(())
6759    }
6760
6761    fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6762        Ok(())
6763    }
6764
6765    fn clear_state(&mut self) -> Result<(), DbspError> {
6766        self.circuit
6767            .map_local_nodes_mut(&mut |node| node.clear_state())
6768    }
6769
6770    fn start_replay(&mut self) -> Result<(), DbspError> {
6771        Ok(())
6772    }
6773
6774    fn is_replay_complete(&self) -> bool {
6775        true
6776    }
6777
6778    fn end_replay(&mut self) -> Result<(), DbspError> {
6779        Ok(())
6780    }
6781
6782    fn set_label(&mut self, key: &str, value: &str) {
6783        self.labels.insert(key.to_string(), value.to_string());
6784    }
6785
6786    fn get_label(&self, key: &str) -> Option<&str> {
6787        self.labels.get(key).map(|s| s.as_str())
6788    }
6789
6790    fn labels(&self) -> &BTreeMap<String, String> {
6791        &self.labels
6792    }
6793
6794    fn map_child(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
6795        self.circuit.map_node_relative(path, f);
6796    }
6797
6798    fn map_child_mut(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
6799        self.circuit.map_node_mut_relative(path, f);
6800    }
6801
6802    fn as_any(&self) -> &dyn Any {
6803        self
6804    }
6805
6806    fn as_circuit(&self) -> Option<&dyn CircuitBase> {
6807        Some(&self.circuit)
6808    }
6809}
6810
6811/// Top-level circuit with executor.
6812///
6813/// This is the interface to a circuit created with [`RootCircuit::build`].
6814/// Call [`CircuitHandle::transaction`] to run the circuit in the context of the
6815/// current thread.
6816pub struct CircuitHandle {
6817    circuit: RootCircuit,
6818    executor: Box<dyn Executor<RootCircuit>>,
6819    tokio_runtime: TokioRuntime,
6820    replay_info: Option<BootstrapInfo>,
6821}
6822
6823impl Drop for CircuitHandle {
6824    fn drop(&mut self) {
6825        self.circuit
6826            .log_scheduler_event(&SchedulerEvent::clock_end());
6827
6828        // Prevent nested panic when `drop` is invoked while panicking
6829        // and `clock_end` triggers another panic due to violated invariants
6830        // since the original panic interrupted normal execution.
6831        if !panicking() {
6832            self.circuit.clock_end(0)
6833        }
6834
6835        // We must explicitly deallocate all nodes in the circuit to break
6836        // cyclic `Rc` references between circuits and streams.  Alternatively,
6837        // we could use weak references to break cycles, but we'd have to
6838        // pay the cost of upgrading weak references on each access.
6839        self.circuit.clear();
6840    }
6841}
6842
6843/// Operators involved in the replay phase of a circuit.
6844#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6845pub struct BootstrapInfo {
6846    /// Operators that will replay their contents during the replay phase.
6847    pub replay_sources: BTreeMap<NodeId, StreamId>,
6848
6849    /// Operators that require backfill from upstream nodes, including their persistent IDs.
6850    #[allow(dead_code)]
6851    pub need_backfill: BTreeMap<NodeId, Option<String>>,
6852}
6853
6854impl CircuitHandle {
6855    /// Start and instantly commit a transaction, waiting for the commit to complete.
6856    pub fn transaction(&self) -> Result<(), DbspError> {
6857        self.tokio_runtime
6858            .block_on(async {
6859                let local_set = LocalSet::new();
6860                local_set
6861                    .run_until(async { self.executor.transaction(&self.circuit).await })
6862                    .await
6863            })
6864            .map_err(DbspError::Scheduler)
6865    }
6866
6867    /// Start a transaction.
6868    ///
6869    /// A transaction consists of a sequence of steps that evaluate a set of inputs for a single logical
6870    /// clock tick.
6871    pub fn start_transaction(&self) -> Result<(), DbspError> {
6872        self.tokio_runtime
6873            .block_on(async {
6874                let local_set = LocalSet::new();
6875                local_set
6876                    .run_until(async { self.executor.start_transaction(&self.circuit).await })
6877                    .await
6878            })
6879            .map_err(DbspError::Scheduler)
6880    }
6881
6882    /// Start committing the current transaction by forcing all operators to process
6883    /// their inputs to completion.
6884    ///
6885    /// The caller must invoke `step` repeatedly until the commit is complete.
6886    pub fn start_commit_transaction(&self) -> Result<(), DbspError> {
6887        self.executor
6888            .start_commit_transaction()
6889            .map_err(DbspError::Scheduler)
6890    }
6891
6892    pub fn is_commit_complete(&self) -> bool {
6893        self.executor.is_commit_complete()
6894    }
6895
6896    pub fn commit_progress(&self) -> CommitProgress {
6897        self.executor.commit_progress()
6898    }
6899
6900    /// Evaluate the circuit for a single step.
6901    pub fn step(&self) -> Result<(), DbspError> {
6902        self.tokio_runtime
6903            .block_on(async {
6904                let local_set = LocalSet::new();
6905                local_set
6906                    .run_until(async { self.executor.step(&self.circuit).await })
6907                    .await
6908            })
6909            .map_err(DbspError::Scheduler)
6910    }
6911
6912    pub fn checkpoint(
6913        &mut self,
6914        base: &StoragePath,
6915        files: &mut Vec<Arc<dyn FileCommitter>>,
6916    ) -> Result<(), DbspError> {
6917        // if Runtime::worker_index() == 0 {
6918        //     self.circuit.to_dot_file(
6919        //         |node| {
6920        //             Some(crate::utils::DotNodeAttributes::new().with_label(&format!(
6921        //                 "{}-{}-{}",
6922        //                 node.local_id(),
6923        //                 node.name(),
6924        //                 node.persistent_id().unwrap_or_default()
6925        //             )))
6926        //         },
6927        //         |edge| {
6928        //             let style = if edge.is_dependency() {
6929        //                 Some("dotted".to_string())
6930        //             } else {
6931        //                 None
6932        //             };
6933        //             let label = if let Some(stream) = &edge.stream {
6934        //                 Some(format!("consumers: {}", stream.num_consumers()))
6935        //             } else {
6936        //                 None
6937        //             };
6938        //             Some(
6939        //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
6940        //                     .with_style(style)
6941        //                     .with_label(label),
6942        //             )
6943        //         },
6944        //         "commit.dot",
6945        //     );
6946        //     info!("CircuitHandle::commit: circuit written to commit.dot");
6947        // }
6948
6949        self.circuit
6950            .map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
6951                DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS
6952                    .record_callback(|| node.checkpoint(base, files))
6953            })
6954    }
6955
6956    /// Restores the circuit from a checkpoint.
6957    ///
6958    /// Restore the circuit from a checkpoint and prepare it to backfill new and
6959    /// modified parts of the circuit if necessary.
6960    ///
6961    /// 1. Find and restore the checkpointed state of each operator.
6962    /// 2. Identify stateful operators (such as integrals and output nodes) that don't have
6963    ///    a checkpoint and require backfill (the `need_backfill` set).
6964    /// 3. Iterate backward from `need_backfill` nodes to find all operators that
6965    ///    should participate in the replay phase of the circuit. Iteration stops when
6966    ///    reaching a stream whose contents can be replayed from an existing node
6967    ///    that has a checkpoint or an input node.
6968    /// 4. If the circuit requires backfill, prepare the circuit for replay by
6969    ///    configuring the scheduler to only schedule nodes that participate in
6970    ///    backfill.
6971    ///
6972    /// Returns `None` if the circuit does not require backfill; returns info about
6973    /// nodes that participate in backfill otherwise.
6974    ///
6975    /// * After calling this function, the client can invoke `step` repeatedly for replay to make progress.
6976    /// * Use `is_replay_complete` to determine whether the circuit has finished the replay.
6977    /// * Use `complete_replay` to finalize the replay phase and prepare the circuit for normal operation after replay is complete.
6978    pub fn restore(&mut self, base: &StoragePath) -> Result<Option<BootstrapInfo>, DbspError> {
6979        // Nodes that will act as replay sources during the replay phase of the circuit.
6980        let mut replay_sources: BTreeMap<NodeId, StreamId> = BTreeMap::new();
6981
6982        // Nodes that require backfill from upstream nodes.
6983        let mut need_backfill: BTreeSet<GlobalNodeId> = BTreeSet::new();
6984
6985        // debug!("CircuitHandle::restore: restoring from checkpoint {}", base);
6986
6987        // Initialize `need_backfill` to operators without a checkpoint.
6988        // Fail if there are any errors other than NotFound.
6989        // By the end of this, `need_backfill` will contain all new integrals and output
6990        // nodes that need backfill.
6991        self.circuit.map_nodes_recursive_mut(
6992            &mut |node: &mut dyn Node| match node.restore(base) {
6993                Err(e) if Runtime::mode() == Mode::Ephemeral => Err(e),
6994                Err(DbspError::Storage(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
6995                    need_backfill.insert(node.global_id().clone());
6996                    Ok(())
6997                }
6998                Err(DbspError::IO(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
6999                    need_backfill.insert(node.global_id().clone());
7000                    Ok(())
7001                }
7002                Err(e) => Err(e),
7003                Ok(()) => Ok(()),
7004            },
7005        )?;
7006
7007        // Additional nodes that must be backfilled to keep the balancer state consistent.
7008        let additional_need_backfill: BTreeSet<GlobalNodeId> =
7009            self.invalidate_balancer_clusters(&need_backfill);
7010        if Runtime::worker_index() == 0 {
7011            debug!(
7012                "CircuitHandle::restore: additional need backfill: {:?}",
7013                additional_need_backfill
7014            );
7015        }
7016        need_backfill.extend(additional_need_backfill);
7017
7018        debug!(
7019            "worker {}: CircuitHandle::restore: found {} operators that require backfill: {:?}",
7020            Runtime::worker_index(),
7021            need_backfill.len(),
7022            need_backfill.iter().cloned().collect::<Vec<GlobalNodeId>>()
7023        );
7024
7025        // We can only backfill a nested circuit as a whole, so if we encounter at least
7026        // one node in a nested circuit that needs backfill, we backfill the
7027        // entire circuit.
7028        let need_backfill = need_backfill
7029            .into_iter()
7030            .map(|gid| gid.top_level_ancestor())
7031            .collect::<BTreeSet<_>>();
7032
7033        // Iterate backward from `need_backfill` nodes to find all operators that
7034        // should participate in the replay.
7035
7036        // All nodes that participate in the replay phase, including replay sources and
7037        // nodes that need backfilling.
7038        let mut participate_in_backfill = need_backfill.clone();
7039
7040        // New nodes computed at each iteration.
7041        let mut participate_in_backfill_new = need_backfill.clone();
7042
7043        while !participate_in_backfill_new.is_empty() {
7044            participate_in_backfill_new = self.compute_replay_nodes_step(
7045                &mut replay_sources,
7046                &need_backfill,
7047                participate_in_backfill_new,
7048                &mut participate_in_backfill,
7049            )?;
7050        }
7051
7052        debug!(
7053            "worker {}: CircuitHandle::restore: replaying {} operators: {:?}\n  backfilling {} operators: {:?}\n  replay circuit consists of {} operators: {:?}",
7054            Runtime::worker_index(),
7055            replay_sources.len(),
7056            replay_sources.keys().cloned().collect::<Vec<NodeId>>(),
7057            need_backfill.len(),
7058            need_backfill.iter().cloned().collect::<Vec<NodeId>>(),
7059            participate_in_backfill.len(),
7060            participate_in_backfill
7061                .iter()
7062                .cloned()
7063                .collect::<Vec<NodeId>>()
7064        );
7065
7066        assert!(
7067            replay_sources
7068                .keys()
7069                .cloned()
7070                .collect::<BTreeSet<_>>()
7071                .intersection(&need_backfill)
7072                .collect::<Vec<_>>()
7073                .is_empty()
7074        );
7075
7076        // Nodes that will be backfilled from upstream nodes, including need_backfill nodes
7077        // and their transitive ancestors.
7078        let nodes_to_backfill = participate_in_backfill
7079            .difference(&replay_sources.keys().cloned().collect::<BTreeSet<_>>())
7080            .cloned()
7081            .collect::<BTreeSet<_>>();
7082
7083        if !participate_in_backfill.is_empty() {
7084            // Configure all `replay_nodes` to run in replay mode.
7085            for node_id in replay_sources.keys() {
7086                self.circuit
7087                    .map_local_node_mut(*node_id, &mut |node| node.start_replay())?;
7088            }
7089
7090            // Clear the state of `need_backfill` nodes.
7091            for node_id in nodes_to_backfill.iter() {
7092                self.circuit
7093                    .map_local_node_mut(*node_id, &mut |node| node.clear_state())?;
7094            }
7095
7096            // Prepare the scheduler to only run `participate_in_backfill`.
7097            self.executor
7098                .prepare(&self.circuit, Some(&participate_in_backfill))?;
7099
7100            // info!("CircuitHandle::restore: replay circuit is ready");
7101
7102            // if Runtime::worker_index() == 0 {
7103            //     self.circuit.to_dot_file(
7104            //         |node| {
7105            //             if !node.global_id().is_child_of(self.circuit.global_id()) {
7106            //                 return None;
7107            //             }
7108            //             let color = if replay_sources.contains_key(&node.local_id()) {
7109            //                 Some(0xff5555)
7110            //             } else if participate_in_backfill.contains(&node.local_id()) {
7111            //                 Some(0x5555ff)
7112            //             } else {
7113            //                 None
7114            //             };
7115            //             Some(crate::utils::DotNodeAttributes::new().with_color(color))
7116            //         },
7117            //         |edge| {
7118            //             let style = if edge.is_dependency() {
7119            //                 Some("dotted".to_string())
7120            //             } else {
7121            //                 None
7122            //             };
7123            //             let label = if let Some(stream) = &edge.stream {
7124            //                 Some(format!("consumers: {}", stream.num_consumers()))
7125            //             } else {
7126            //                 None
7127            //             };
7128            //             Some(
7129            //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
7130            //                     .with_style(style)
7131            //                     .with_label(label),
7132            //             )
7133            //         },
7134            //         "replay.dot",
7135            //     );
7136            //     println!("CircuitHandle::restore: replay circuit is written to replay.dot");
7137            // }
7138
7139            // Add persistent IDs to the need_backfill set.
7140            let need_backfill = nodes_to_backfill
7141                .iter()
7142                .map(|node_id| {
7143                    let pid = self.circuit.map_local_node_mut(*node_id, &mut |node| {
7144                        node.get_label(LABEL_PERSISTENT_OPERATOR_ID)
7145                            .map(|s| s.to_string())
7146                    });
7147
7148                    (*node_id, pid)
7149                })
7150                .collect::<BTreeMap<_, _>>();
7151
7152            let replay_info = BootstrapInfo {
7153                replay_sources: replay_sources.clone(),
7154                need_backfill,
7155            };
7156
7157            self.replay_info = Some(replay_info.clone());
7158
7159            Ok(Some(replay_info))
7160        } else {
7161            Ok(None)
7162        }
7163    }
7164
7165    /// Compute additional nodes whose state must be discarded and backfilled in order
7166    /// to ensure that the balancer state is consistent.
7167    ///
7168    /// The new version of the circuit may have a different join graph in which the partitioning
7169    /// policy used to create the checkpoint may no longer be valid.
7170    ///
7171    /// Example:
7172    ///
7173    /// Consider the following checkpointed circuit:
7174    ///
7175    /// s1 = join(a, b)
7176    /// s2 = join(c, d)
7177    ///
7178    /// where streams a and c are partitioned using PartitioningPolicy::Broadcast, and
7179    /// b and d are partitioned using PartitioningPolicy::Shard.
7180    ///
7181    /// The new circuit adds
7182    ///
7183    /// s3 = join(a, c)
7184    ///
7185    /// This new circuit is in an inconsistent state since we can't join two broadcast streams.
7186    ///
7187    /// This function computes a conservative approximation of the set of nodes whose state must
7188    /// be discarded and backfilled to avoid such inconsistencies:
7189    ///
7190    /// 1. For each join cluster, check if there exists a solution that extends the partitioning
7191    ///    policies of the nodes restores from the checkpoint.
7192    /// 2. If no solution exists, mark all nodes in the cluster and their transitive successors
7193    ///    as needing backfill.
7194    fn invalidate_balancer_clusters(
7195        &self,
7196        need_backfill: &BTreeSet<GlobalNodeId>,
7197    ) -> BTreeSet<GlobalNodeId> {
7198        // Convert GlobalNodeId to top-level NodeId for comparison with balancer data.
7199        let need_backfill_node_ids: BTreeSet<NodeId> = need_backfill
7200            .iter()
7201            .map(|gid| gid.top_level_ancestor())
7202            .collect();
7203
7204        // Compute exchange sender nodes that need to be discarded and backfilled.
7205        let additional_need_backfill = self
7206            .circuit
7207            .balancer()
7208            .invalidate_clusters_for_bootstrapping(&need_backfill_node_ids);
7209
7210        // Invalidate all successors as well; otherwise we can end up with nodes that are not
7211        // marked for backfill but their ancestors are.
7212        let nodes_to_add = self.propagate_need_backfill_forward(
7213            additional_need_backfill
7214                .difference(&need_backfill_node_ids)
7215                .cloned()
7216                .collect(),
7217        );
7218
7219        // Convert NodeIds back to GlobalNodeIds and add to additional_need_backfill
7220        nodes_to_add
7221            .into_iter()
7222            .map(|node_id| GlobalNodeId::root().child(node_id))
7223            .collect()
7224    }
7225
7226    /// Compute all transitive successors of need_backfill nodes.
7227    fn propagate_need_backfill_forward(
7228        &self,
7229        mut need_backfill: BTreeSet<NodeId>,
7230    ) -> BTreeSet<NodeId> {
7231        // Recursively add all successors of these exchange sender nodes
7232        let mut worklist: Vec<NodeId> = need_backfill.iter().cloned().collect();
7233        let mut visited = BTreeSet::new();
7234
7235        while let Some(node_id) = worklist.pop() {
7236            if visited.contains(&node_id) {
7237                continue;
7238            }
7239            visited.insert(node_id);
7240
7241            // Get all successors of this node
7242            let successors: Vec<NodeId> = self
7243                .circuit
7244                .edges()
7245                .by_source
7246                .get(&node_id)
7247                .into_iter()
7248                .flat_map(|edges| edges.iter().map(|edge| edge.to))
7249                .collect();
7250
7251            for successor in successors {
7252                if !visited.contains(&successor) {
7253                    worklist.push(successor);
7254                    need_backfill.insert(successor);
7255                }
7256            }
7257
7258            // Add all dependencies of this node. Makes sure that the output half of Z-1 is marked for backfill.
7259            let dependencies: Vec<NodeId> = self
7260                .circuit
7261                .edges()
7262                .by_destination
7263                .get(&node_id)
7264                .into_iter()
7265                .flat_map(|edges| edges.iter())
7266                .filter(|edge| edge.is_dependency())
7267                .map(|edge| edge.from)
7268                .collect();
7269
7270            for dependency in dependencies {
7271                if !visited.contains(&dependency) {
7272                    worklist.push(dependency);
7273                    need_backfill.insert(dependency);
7274                }
7275            }
7276        }
7277
7278        need_backfill
7279    }
7280
7281    /// Iterative step of computing the set of nodes that participate in the replay phase.
7282    ///
7283    /// Find all input streams of nodes in `participate_in_backfill_new`.
7284    ///
7285    /// * For streams that can be replayed from a replay source:
7286    ///  - add the replay source to `replay_sources` and `participate_in_backfill`.
7287    ///  - create replay streams from the replay source to each consumer of the original stream.
7288    /// * For other streams, add their origin nodes to `participate_in_backfill`.
7289    ///
7290    /// Return the set of nodes newly added to `participate_in_backfill`.
7291    fn compute_replay_nodes_step(
7292        &self,
7293        replay_sources: &mut BTreeMap<NodeId, StreamId>,
7294        need_backfill: &BTreeSet<NodeId>,
7295        participate_in_backfill_new: BTreeSet<NodeId>,
7296        participate_in_backfill: &mut BTreeSet<NodeId>,
7297    ) -> Result<BTreeSet<NodeId>, DbspError> {
7298        let mut inputs = BTreeSet::new();
7299
7300        for node_id in participate_in_backfill_new.iter() {
7301            // Compute immediate ancestors of node_id, including:
7302            // 1. Nodes connected to node_id by a stream.
7303            // 2. Nodes that depend on node_id -- makes sure that if we schedule the output half of a strict operator,
7304            //    we will also schedule the input half.
7305            // 3. Nodes that node_id depends on -- Makes sure that if we schedule the output of an exchange operator,
7306            //    we will schedule the input part as well.
7307
7308            // 1.
7309            let node_inputs = self
7310                .circuit
7311                .edges()
7312                .by_destination
7313                .get(node_id)
7314                .iter()
7315                .flat_map(|edges| edges.iter())
7316                .filter(|edge| edge.is_stream())
7317                .map(|edge| {
7318                    // If the origin of the stream is a node inside the nested circuit, we will clear and replay the
7319                    // entire nested circuit, as we don't currently have a way to replay state from inside a nested
7320                    // circuit into a parent circuit.
7321                    (Some(edge.stream_id().unwrap()), edge.from)
7322                })
7323                .collect::<Vec<_>>();
7324
7325            for input in node_inputs.into_iter() {
7326                inputs.insert(input);
7327            }
7328
7329            // 2.
7330            for edge in self.circuit.edges().dependencies_of(*node_id) {
7331                inputs.insert((None, edge.from));
7332            }
7333
7334            // 3.
7335            for edge in self.circuit.edges().depend_on(*node_id) {
7336                inputs.insert((None, edge.to));
7337            }
7338        }
7339
7340        let mut participate_in_backfill_new = BTreeSet::new();
7341
7342        let mut replay_streams = BTreeMap::new();
7343
7344        for (stream_id, mut node_id) in inputs.into_iter() {
7345            // println!("replay needed for ({stream_id}, {node_id})");
7346
7347            // Add all ancestors of `participate_in_backfill_new` to the `participate_in_backfill` set, except streams
7348            // that can be replayed from a different node.
7349            if let Some(stream_id) = stream_id
7350                && let Some(replay_source) = self.circuit.get_replay_source(stream_id)
7351            {
7352                // If the replay source is itself in the need_backfill set (i.e., it's an integral without
7353                // a checkpoint), it cannot be used for replay.
7354                if !need_backfill.contains(&replay_source.local_node_id()) {
7355                    replay_streams.insert(stream_id, replay_source.clone());
7356                    // trace!(
7357                    //     "worker {}: Replacing node_id {node_id} with replay source {}",
7358                    //     Runtime::worker_index(),
7359                    //     replay_source.origin_node_id()
7360                    // );
7361
7362                    // Replace the node_id with the replay source.
7363                    node_id = replay_source.local_node_id();
7364                }
7365            }
7366
7367            if !participate_in_backfill.contains(&node_id) {
7368                // println!("Adding {gid} to participate_in_backfill via {stream_id:?}");
7369                participate_in_backfill.insert(node_id);
7370                participate_in_backfill_new.insert(node_id);
7371            }
7372        }
7373
7374        // Connect `replay_streams` to all operators that consume the original stream.
7375        for (original_stream, replay_stream) in replay_streams.into_iter() {
7376            replay_sources
7377                .entry(replay_stream.local_node_id())
7378                .or_insert_with(|| {
7379                    self.circuit
7380                        .add_replay_edges(original_stream, replay_stream.as_ref());
7381                    replay_stream.stream_id()
7382                });
7383        }
7384
7385        Ok(participate_in_backfill_new)
7386    }
7387
7388    /// Returns `true` if all replay sources have completed their replay.
7389    pub fn is_replay_complete(&self) -> bool {
7390        let Some(replay_info) = self.replay_info.as_ref() else {
7391            return true;
7392        };
7393
7394        replay_info.replay_sources.keys().all(|node_id| {
7395            self.circuit
7396                .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
7397        })
7398    }
7399
7400    /// Finalize the replay phase of the circuit.
7401    ///
7402    /// * Notify all replay sources to go back to normal operation.
7403    /// * Delete all replay streams.
7404    /// * Prepare the scheduler to run the full circuit.
7405    pub fn complete_replay(&mut self) -> Result<(), DbspError> {
7406        // info!("Replay complete");
7407
7408        let Some(replay_info) = self.replay_info.take() else {
7409            return Ok(());
7410        };
7411
7412        // End replay mode.
7413        for (node_id, stream_id) in replay_info.replay_sources.iter() {
7414            self.circuit
7415                .map_local_node_mut(*node_id, &mut |node| node.end_replay())?;
7416            self.circuit.edges_mut().delete_stream(*stream_id);
7417        }
7418
7419        // Prepare the scheduler to run the full circuit.
7420        self.executor.prepare(&self.circuit, None)?;
7421
7422        // if Runtime::worker_index() == 0 {
7423        //     self.circuit.to_dot_file(
7424        //         |_node| Some(crate::circuit::dot::DotNodeAttributes::new()),
7425        //         |edge| {
7426        //             let style = if edge.is_dependency() {
7427        //                 Some("dotted".to_string())
7428        //             } else {
7429        //                 None
7430        //             };
7431        //             let label = if let Some(stream) = &edge.stream {
7432        //                 Some(format!("consumers: {}", stream.num_consumers()))
7433        //             } else {
7434        //                 None
7435        //             };
7436        //             Some(
7437        //                 crate::circuit::dot::DotEdgeAttributes::new(edge.stream_id())
7438        //                     .with_style(style)
7439        //                     .with_label(label),
7440        //             )
7441        //         },
7442        //         "final.dot",
7443        //     );
7444        //     info!("CircuitHandle::restore: final circuit is written to final.dot");
7445        // }
7446
7447        Ok(())
7448    }
7449
7450    pub fn fingerprint(&self) -> u64 {
7451        let mut fip = Fingerprinter::default();
7452        let _ = self.circuit.map_nodes_recursive(&mut |node: &dyn Node| {
7453            node.fingerprint(&mut fip);
7454            Ok(())
7455        });
7456        fip.finish()
7457    }
7458
7459    /// Attach a scheduler event handler to the circuit.
7460    ///
7461    /// This method is identical to
7462    /// [`RootCircuit::register_scheduler_event_handler`], but it can be used at
7463    /// runtime, after the circuit has been fully constructed.
7464    ///
7465    /// Use [`RootCircuit::register_scheduler_event_handler`],
7466    /// [`RootCircuit::unregister_scheduler_event_handler`], to manipulate
7467    /// handlers during circuit construction.
7468    pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
7469    where
7470        F: FnMut(&SchedulerEvent<'_>) + 'static,
7471    {
7472        self.circuit.register_scheduler_event_handler(name, handler);
7473    }
7474
7475    /// Remove a scheduler event handler.
7476    ///
7477    /// This method is identical to
7478    /// [`RootCircuit::unregister_scheduler_event_handler`], but it can be used
7479    /// at runtime, after the circuit has been fully constructed.
7480    pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
7481        self.circuit.unregister_scheduler_event_handler(name)
7482    }
7483
7484    /// Export circuit in LIR format.
7485    pub fn lir(&self) -> LirCircuit {
7486        (&self.circuit as &dyn CircuitBase).to_lir()
7487    }
7488
7489    pub fn set_balancer_hint(
7490        &self,
7491        global_node_id: &GlobalNodeId,
7492        hint: BalancerHint,
7493    ) -> Result<(), DbspError> {
7494        self.circuit.set_balancer_hint(global_node_id, hint)
7495    }
7496
7497    pub fn get_current_balancer_policy(&self) -> BTreeMap<GlobalNodeId, PartitioningPolicy> {
7498        self.circuit
7499            .get_current_balancer_policy()
7500            .into_iter()
7501            .map(|(node_id, policy)| (GlobalNodeId::root().child(node_id), policy))
7502            .collect()
7503    }
7504}
7505
7506#[cfg(test)]
7507mod tests {
7508    use crate::{
7509        Circuit, Error as DbspError, RootCircuit,
7510        circuit::schedule::{DynamicScheduler, Scheduler},
7511        monitor::TraceMonitor,
7512        operator::{Generator, Z1},
7513    };
7514    use anyhow::anyhow;
7515    use std::{cell::RefCell, ops::Deref, rc::Rc, vec::Vec};
7516
7517    #[test]
7518    fn sum_circuit_dynamic() {
7519        sum_circuit::<DynamicScheduler>();
7520    }
7521    // Compute the sum of numbers from 0 to 99.
7522    fn sum_circuit<S>()
7523    where
7524        S: Scheduler + 'static,
7525    {
7526        let actual_output: Rc<RefCell<Vec<isize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7527        let actual_output_clone = actual_output.clone();
7528        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7529            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7530            let mut n: isize = 0;
7531            let source = circuit.add_source(Generator::new(move || {
7532                let result = n;
7533                n += 1;
7534                result
7535            }));
7536            let integrator = source.integrate();
7537            integrator.inspect(|n| println!("{}", n));
7538            integrator.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7539            Ok(())
7540        })
7541        .unwrap()
7542        .0;
7543
7544        for _ in 0..100 {
7545            circuit.transaction().unwrap();
7546        }
7547
7548        let mut sum = 0;
7549        let mut expected_output: Vec<isize> = Vec::with_capacity(100);
7550        for i in 0..100 {
7551            sum += i;
7552            expected_output.push(sum);
7553        }
7554        assert_eq!(&expected_output, actual_output.borrow().deref());
7555    }
7556
7557    #[test]
7558    fn recursive_sum_circuit_dynamic() {
7559        recursive_sum_circuit::<DynamicScheduler>()
7560    }
7561
7562    fn recursive_sum_circuit<S>()
7563    where
7564        S: Scheduler + 'static,
7565    {
7566        let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7567        let actual_output_clone = actual_output.clone();
7568
7569        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7570            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7571
7572            let mut n: usize = 0;
7573            let source = circuit.add_source(Generator::new(move || {
7574                let result = n;
7575                n += 1;
7576                result
7577            }));
7578            let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
7579            let plus = source
7580                .apply2(&z1_output, |n1: &usize, n2: &usize| *n1 + *n2)
7581                .inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7582            z1_feedback.connect(&plus);
7583            Ok(())
7584        })
7585        .unwrap()
7586        .0;
7587
7588        for _ in 0..100 {
7589            circuit.transaction().unwrap();
7590        }
7591
7592        let mut sum = 0;
7593        let mut expected_output: Vec<usize> = Vec::with_capacity(100);
7594        for i in 0..100 {
7595            sum += i;
7596            expected_output.push(sum);
7597        }
7598        assert_eq!(&expected_output, actual_output.borrow().deref());
7599    }
7600
7601    #[test]
7602    fn factorial_dynamic() {
7603        factorial::<DynamicScheduler>();
7604    }
7605
7606    // Nested circuit.  The circuit contains a source node that counts up from
7607    // 1. For each `n` output by the source node, the nested circuit computes
7608    // factorial(n) using a `NestedSource` operator that counts from n down to
7609    // `1` and a multiplier that multiplies the next count by the product
7610    // computed so far (stored in z-1).
7611    fn factorial<S>()
7612    where
7613        S: Scheduler + 'static,
7614    {
7615        let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7616        let actual_output_clone = actual_output.clone();
7617
7618        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7619            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7620
7621            let mut n: usize = 0;
7622            let source = circuit.add_source(Generator::new(move || {
7623                n += 1;
7624                n
7625            }));
7626            let fact = circuit
7627                .iterate_with_condition_and_scheduler::<_, _, S>(|child| {
7628                    let mut counter = 0;
7629                    let countdown = source.delta0(child).apply_mut(move |parent_val| {
7630                        if *parent_val > 0 {
7631                            counter = *parent_val;
7632                        };
7633                        let res = counter;
7634                        counter -= 1;
7635                        res
7636                    });
7637                    let (z1_output, z1_feedback) = child.add_feedback_with_export(Z1::new(1));
7638                    let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
7639                    z1_feedback.connect(&mul);
7640                    Ok((countdown.condition(|n| *n <= 1), z1_output.export))
7641                })
7642                .unwrap();
7643            fact.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7644            Ok(())
7645        })
7646        .unwrap()
7647        .0;
7648
7649        for _ in 1..10 {
7650            circuit.transaction().unwrap();
7651        }
7652
7653        let mut expected_output: Vec<usize> = Vec::with_capacity(10);
7654        for i in 1..10 {
7655            expected_output.push(my_factorial(i));
7656        }
7657        assert_eq!(&expected_output, actual_output.borrow().deref());
7658    }
7659
7660    fn my_factorial(n: usize) -> usize {
7661        if n == 1 { 1 } else { n * my_factorial(n - 1) }
7662    }
7663
7664    #[test]
7665    fn init_circuit_constructor_error() {
7666        match RootCircuit::build(|_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
7667            Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
7668            _ => panic!(),
7669        }
7670    }
7671}