Skip to main content

dbsp/circuit/
circuit_builder.rs

1//! API to construct circuits.
2//!
3//! The API exposes two abstractions: "circuits" and "streams", where a circuit
4//! is a possibly nested dataflow graph that consists of operators connected by
5//! streams:
6//!
7//!   * Circuits are represented by the [`Circuit`] trait, which has a single
8//!     implementation [`ChildCircuit<P>`], where `P` is the type of the parent
9//!     circuit.  For a root circuit, `P` is `()`, so the API provides
10//!     [`RootCircuit`] as a synonym for `ChildCircuit<()>`).
11//!
12//!     Use [`RootCircuit::build`] to create a new root circuit.  It takes a
13//!     user-provided callback, which it calls to set up operators and streams
14//!     inside the circuit, and then it returns the circuit. The circuit's
15//!     structure is fixed at construction and can't be changed afterward.
16//!
17//!   * Streams are represented by struct [`Stream<C, D>`], which is a stream
18//!     that carries values of type `D` within a circuit of type `C`.  Methods
19//!     and traits on `Stream` are the main way to assemble the structure of a
20//!     circuit within the [`RootCircuit::build`] callback.
21//!
22//! The API that this directly exposes runs the circuit in the context of the
23//! current thread.  To instead run the circuit in a collection of worker
24//! threads, use [`Runtime::init_circuit`].
25use crate::{
26    Error as DbspError, Position, Runtime, RuntimeError,
27    circuit::{
28        cache::{CircuitCache, CircuitStoreMarker},
29        fingerprinter::Fingerprinter,
30        metadata::OperatorMeta,
31        metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
32        operator_traits::{
33            BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
34            QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
35            TernarySinkOperator, UnaryOperator,
36        },
37        runtime::Consensus,
38        schedule::{
39            CommitProgress, DynamicScheduler, Error as SchedulerError, Executor, IterativeExecutor,
40            OnceExecutor, Scheduler,
41        },
42        trace::{CircuitEvent, SchedulerEvent},
43    },
44    circuit_cache_key,
45    ir::LABEL_MIR_NODE_ID,
46    operator::dynamic::balance::{Balancer, BalancerError, BalancerHint, PartitioningPolicy},
47    time::{Timestamp, UnitTimestamp},
48};
49#[cfg(doc)]
50use crate::{
51    InputHandle, OutputHandle,
52    algebra::{IndexedZSet, ZSet},
53    operator::{Aggregator, Fold, Generator, Max, Min, time_series::RelRange},
54    trace::Batch,
55};
56use anyhow::Error as AnyError;
57use dyn_clone::{DynClone, clone_box};
58use feldera_ir::{LirCircuit, LirNodeId};
59use feldera_samply::Span;
60use feldera_storage::{FileCommitter, StoragePath};
61use itertools::Itertools;
62use pin_project_lite::pin_project;
63use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
64use std::{
65    any::{Any, TypeId, type_name_of_val},
66    borrow::Cow,
67    cell::{Cell, Ref, RefCell, RefMut},
68    collections::{BTreeMap, BTreeSet, HashMap},
69    fmt::{self, Debug, Display, Write},
70    future::Future,
71    io::ErrorKind,
72    marker::PhantomData,
73    mem::{take, transmute},
74    ops::Deref,
75    panic::Location,
76    pin::Pin,
77    rc::Rc,
78    sync::Arc,
79    task::{Context, Poll},
80    thread::panicking,
81    time::{Duration, Instant},
82};
83use tokio::{runtime::Runtime as TokioRuntime, task::LocalSet};
84use tracing::debug;
85use typedmap::{TypedMap, TypedMapKey};
86
87use super::dbsp_handle::Mode;
88
89/// Label name used to store operator's persistent id,
90/// i.e., id stable across circuit modifications.
91const LABEL_PERSISTENT_OPERATOR_ID: &str = "persistent_id";
92
93/// Value stored in the stream.
94struct StreamValue<D> {
95    /// Value written to the stream at the current clock cycle;
96    /// `None` after the last consumer has retrieved the value from the stream.
97    val: Option<D>,
98
99    /// The number of consumers connected to the stream.  Each consumer
100    /// reads from the stream exactly once at every clock cycle.
101    ///
102    /// Controlled by the scheduler via `register_consumer` and `clear_consumer_count`.
103    consumers: usize,
104
105    /// The number of remaining consumers still expected to read from the stream
106    /// at the current clock cycle.  This value is reset to `consumers` when
107    /// a new value is written to the stream.  It is decremented on each access.
108    /// The last consumer to read from the stream (`tokens` drops to 0) obtains
109    /// an owned value rather than a borrow.  See description of
110    /// [ownership-aware scheduling](`OwnershipPreference`) for details.
111    tokens: Cell<usize>,
112}
113
114impl<D> StreamValue<D> {
115    const fn empty() -> Self {
116        Self {
117            val: None,
118            consumers: 0,
119            tokens: Cell::new(0),
120        }
121    }
122
123    fn put(&mut self, val: D) {
124        // Check that stream contents was consumed at the last clock cycle.
125        // This isn't strictly necessary for correctness, but can indicate a
126        // scheduling or token counting error.
127        debug_assert!(self.val.is_none());
128
129        // If the stream is not connected to any consumers, drop the output
130        // on the floor.
131        if self.consumers > 0 {
132            self.tokens = Cell::new(self.consumers);
133            self.val = Some(val);
134        }
135    }
136
137    /// Returns a reference to the value.
138    fn peek<R>(this: &R) -> &D
139    where
140        R: Deref<Target = Self>,
141    {
142        debug_assert_ne!(this.tokens.get(), 0);
143
144        this.val.as_ref().unwrap()
145    }
146
147    /// Returns the owned value, leaving `this` empty iff the number of remaining
148    /// tokens is 1; returns None otherwise.
149    fn take(this: &RefCell<Self>) -> Option<D>
150    where
151        D: Clone,
152    {
153        let tokens = this.borrow().tokens.get();
154        debug_assert_ne!(tokens, 0);
155
156        if tokens == 1 {
157            Some(this.borrow_mut().val.take().unwrap())
158        } else {
159            None
160        }
161    }
162
163    /// Must be called exactly once by each consumer of the stream at each clock cycle,
164    /// when the consumer has finished processing the contents of the stream. The consumer
165    /// is not allowed to access the value after calling this function. This guarantees that,
166    /// once the number of tokens drops to 1, only one active consumer remains and that
167    /// consumer can retrieve the value using `Self::take`.
168    fn consume_token(this: &RefCell<Self>) {
169        let this_ref = this.borrow();
170        debug_assert_ne!(this_ref.tokens.get(), 0);
171        this_ref.tokens.update(|tokens| tokens - 1);
172        if this_ref.tokens.get() == 0 {
173            // We're the last consumer. It's now safe to take a mutable reference to `this`.
174            drop(this_ref);
175            this.borrow_mut().val.take();
176        }
177    }
178}
179
180#[repr(transparent)]
181pub struct RefStreamValue<D>(Rc<RefCell<StreamValue<D>>>);
182
183impl<D> Clone for RefStreamValue<D> {
184    fn clone(&self) -> Self {
185        Self(self.0.clone())
186    }
187}
188
189impl<D> RefStreamValue<D> {
190    pub fn empty() -> Self {
191        Self(Rc::new(RefCell::new(StreamValue::empty())))
192    }
193
194    fn get_mut(&self) -> RefMut<'_, StreamValue<D>> {
195        self.0.borrow_mut()
196    }
197
198    fn get(&self) -> Ref<'_, StreamValue<D>> {
199        self.0.borrow()
200    }
201
202    /// Put a new value in the stream.
203    ///
204    /// # Panics
205    ///
206    /// Panics if someone is holding a reference to the stream value.
207    pub fn put(&self, d: D) {
208        let mut val = self.get_mut();
209        val.put(d);
210    }
211
212    unsafe fn transmute<D2>(&self) -> RefStreamValue<D2> {
213        unsafe {
214            RefStreamValue(std::mem::transmute::<
215                Rc<RefCell<StreamValue<D>>>,
216                Rc<RefCell<StreamValue<D2>>>,
217            >(self.0.clone()))
218        }
219    }
220}
221
222/// An object-safe interface to a stream.
223///
224/// The `Stream` type is parameterized with circuit and content types, and is not object-safe.
225/// This abstract trait abstracts away those types, allowing to pass streams around as trait objects.
226pub trait StreamMetadata: DynClone + 'static {
227    fn stream_id(&self) -> StreamId;
228    fn local_node_id(&self) -> NodeId;
229    fn origin_node_id(&self) -> &GlobalNodeId;
230    fn num_consumers(&self) -> usize;
231
232    /// Resets consumer count to 0.
233    fn clear_consumer_count(&self);
234
235    /// Invoked by the scheduler exactly once for each consumer operator attached
236    /// to the stream.
237    fn register_consumer(&self);
238
239    /// Invoked at each step once by each consumer of the stream.
240    /// When the token count drops to 1, the last consumer can retrieve the value using
241    /// `StreamValue::take`.
242    ///
243    /// Panics if called more times than there are tokens.
244    ///
245    /// All tokens must be consumed at each step; otherwise the circuit will panic
246    /// during the next step.
247    fn consume_token(&self);
248}
249
250dyn_clone::clone_trait_object!(StreamMetadata);
251
252/// A `Stream<C, D>` stores the output value of type `D` of an operator in a
253/// circuit with type `C`.
254///
255/// Circuits are synchronous, meaning that each value is produced and consumed
256/// in the same clock cycle, so there can be at most one value in the stream at
257/// any time.
258///
259/// The value type `D` may be any type, although most `Stream` methods impose
260/// additional requirements.  Since a stream must yield one data item per clock
261/// cycle, the rate at which data arrives is important to the choice of type.
262/// If, for example, an otherwise scalar input stream might not have new data on
263/// every cycle, an `Option` type could represent that, and to batch multiple
264/// pieces of data in a single step, one might use [`Vec`] or another collection
265/// type.
266///
267/// In practice, `D` is often a special collection type called an "indexed
268/// Z-set", represented as trait [`IndexedZSet`].  An indexed Z-set is
269/// conceptually a set of `(key, value, weight)` tuples.  Indexed Z-sets have a
270/// specialization called a "non-indexed Z-set" ([`ZSet`]) that contains `key`
271/// and `weight` only.  Indexed and non-indexed Z-sets are both subtraits of a
272/// higher-level [`Batch`] trait.  Many operators on streams work only with an
273/// indexed or non-indexed Z-set or another batch type as `D`.
274///
275/// # Data streams versus delta streams
276///
277/// A value in a `Stream` can represent data, or it can represent a delta (also
278/// called an "update").  Most streams carry data types that could have either
279/// meaning.  In particular, a stream of indexed or non-indexed Z-sets can carry
280/// either:
281///
282///   * In a stream of data, the `weight` indicates the multiplicity of a key. A
283///     negative `weight` has no natural interpretation and might indicate a
284///     bug.
285///
286///   * In a stream of deltas or updates, a positive `weight` represents
287///     insertions and a negative `weight` represents deletions.
288///
289/// There's no way to distinguish a data stream from a delta stream from just
290/// the type of the `Stream` since, as described above, a stream of Z-sets can
291/// be either one.  Some operators make sense for either kind of stream; for
292/// example, adding streams of Z-sets with [`plus`](`Stream::plus`) works
293/// equally well in either case, or even for adding a delta to data.  But other
294/// operations make sense only for streams of data or only for streams of
295/// deltas.  In these cases, `Stream` often provides an operator for each type
296/// of stream, and the programmer must choose the right one, since the types
297/// themselves don't help.
298///
299/// `Stream` refers to operators specifically for streams of data as
300/// "nonincremental".  These operators, which have `stream` in their name,
301/// e.g. `stream_join`, take streams of data as input and produce one as output.
302/// They act as "lifted scalar operators" that don't maintain state across
303/// invocations and only act on their immediate inputs, that is, each output is
304/// produced by independently applying the operator to the individual inputs
305/// received in the current step:
306///
307/// ```text
308///       ┌─────────────┐
309/// a ───►│    non-     │
310///       │ incremental ├───► c
311/// b ───►│  operator   │
312///       └─────────────┘
313/// ```
314///
315/// `Stream` refers to operators meant for streams of deltas as "incremental".
316/// These operators take streams of deltas as input and produces a stream of
317/// deltas as output.  Such operators could be implemented, inefficiently, in
318/// terms of a nonincremental version by putting an integration operator before
319/// each input and a differentiation operator after the output:
320///
321/// ```text
322///        ┌───┐      ┌─────────────┐
323/// Δa ───►│ I ├─────►│             │
324///        └───┘      │    non-     │    ┌───┐
325///                   │ incremental ├───►│ D ├───► Δc
326///        ┌───┐      │  operator   │    └───┘
327/// Δb ───►│ I ├─────►│             │
328///        └───┘      └─────────────┘
329/// ```
330///
331/// # Operator naming convention
332///
333/// `Stream` uses `_index` and `_generic` suffixes and
334/// `stream_` prefix to declare variations of basic operations, e.g., `map`,
335/// `map_index`, `map_generic`, `map_index_generic`, `join`, `stream_join`:
336///
337///   * `stream_` prefix: This prefix indicates that the operator is
338///     "nonincremental", that is, that it works with streams of data, rather
339///     than streams of deltas (see [Data streams versus delta streams], above).
340///
341///     [Data streams versus delta streams]:
342///     Stream#data-streams-versus-delta-streams
343///
344///   * `_generic` suffix: Most operators can assemble their outputs into any
345///     collection type that implements the [`Batch`] trait.  In practice, we
346///     typically use [`OrdIndexedZSet`](`crate::OrdIndexedZSet`) for indexed
347///     batches and [`OrdZSet`](`crate::OrdZSet`) for non-indexed batches.
348///     Methods without the `_generic` suffix return these concrete types,
349///     eliminating the need to type-annotate each invocation, while `_generic`
350///     methods can be used to return arbitrary custom `Batch` implementations.
351///
352///   * `_index` suffix: Methods without the `_index` suffix return non-indexed
353///     batches.  `<method>_index` methods combine the effects of `<method>` and
354///     [`index`](Self::index), e.g., `stream.map_index(closure)` is
355///     functionally equivalent, but more efficient than,
356///     `stream.map(closure).index()`.
357///
358/// # Catalog of stream operators
359///
360/// `Stream` methods are the main way to perform
361/// computations with streams.  The number of available methods can be
362/// overwhelming, so the subsections below categorize them into functionally
363/// related groups.
364///
365/// ## Input operators
366///
367/// Most streams are obtained via methods or traits that operate on `Stream`s.
368/// The input operators create the initial input streams for these other methods
369/// to work with.
370///
371/// [`Circuit::add_source`] is the fundamental way to add an input stream.
372/// Using it directly makes sense for cases like generating input using a
373/// function (perhaps using [`Generator`]) or reading data from a file.  More
374/// commonly, [`RootCircuit`] offers the `add_input_*` functions as convenience
375/// wrappers for `add_source`.  Each one returns a tuple of:
376///
377///   * A `Stream` that can be attached as input to operators in the circuit
378///     (within the constructor function passed to `RootCircuit::build` only).
379///
380///   * An input handle that can be used to add input to the stream from outside
381///     the circuit.  In a typical scenario, the closure passed to build will
382///     return all of its input handles, which are used at runtime to feed new
383///     inputs to the circuit at each step.  Different functions return
384///     different kinds of input handles.
385///
386/// Use [`RootCircuit::add_input_indexed_zset`] or
387/// [`RootCircuit::add_input_zset`] to create an (indexed) Z-set input
388/// stream. There's also [`RootCircuit::add_input_set`] and
389/// [`RootCircuit::add_input_map`] to simplify cases where a regular set or
390/// map is easier to use than a Z-set.  The latter functions maintain an extra
391/// internal table tracking the contents of the set or map, so they're a second
392/// choice.
393///
394/// For special cases, there's also [`RootCircuit::add_input_stream<T>`].  The
395/// [`InputHandle`] that it returns needs to be told which workers to feed the
396/// data to, which makes it harder to use.  It might be useful for feeding
397/// non-relational data to the circuit, such as the current physical time.  DBSP
398/// does not know how to automatically distribute such values across workers,
399/// so the caller must decide whether to send the value to one specific worker
400/// or to broadcast it to everyone.
401///
402/// It's common to pass explicit type arguments to the functions that
403/// create input streams, e.g.:
404///
405/// ```ignore
406/// circuit.add_input_indexed_zset::<KeyType, ValueType, WeightType>()
407/// ```
408///
409/// ## Output and debugging operators
410///
411/// There's not much value in computations whose output can't be seen in the
412/// outside world.  Use [`Stream::output`] to obtain an [`OutputHandle`] that
413/// exports a stream's data.  The constructor function passed to
414/// [`RootCircuit::build`] should return the `OutputHandle` (in addition to all
415/// the input handles as described above).  After each step, the client code
416/// should take the new data from the `OutputHandle`, typically by calling
417/// [`OutputHandle::consolidate`].
418///
419/// Use [`Stream::inspect`] to apply a callback function to each data item in a
420/// stream.  The callback function has no return value and is executed only for
421/// its side effects, such as printing the data item to stdout.  The `inspect`
422/// operator yields the same stream on its output.
423///
424/// It is not an operator, but [`Circuit::region`] can help with debugging by
425/// grouping operators into a named collection.
426///
427/// ## Record-by-record mapping operators
428///
429/// These operators map one kind of batch to another, allowing the client to
430/// pass in a function that looks at individual records in a Z-set or other
431/// batch.  These functions apply to both streams of deltas and streams of data.
432///
433/// The following methods are available for streams of indexed and non-indexed
434/// Z-sets.  Each of these takes a function that accepts a key (for non-indexed
435/// Z-sets) or a key-value pair (for indexed Z-sets):
436///
437///   * Use [`Stream::map`] to output a non-indexed Z-set using an
438///     arbitrary mapping function, or [`Stream::map_index`] to map to
439///     an indexed Z-set.
440///
441///   * Use [`Stream::filter`] to drop records that do not satisfy a
442///     predicate function.  The output stream has the same type as the input.
443///
444///   * Use [`Stream::flat_map`] to output a Z-set that maps each
445///     input record to any number of records, or
446///     [`Stream::flat_map_index`] for indexed Z-set output.  These
447///     methods also work as a `filter_map` equivalent.
448///
449/// ## Value-by-value mapping operators
450///
451/// Sometimes it makes sense to map a stream's whole data item instead of
452/// breaking Z-sets down into records.  Unlike the record-by-record functions,
453/// these functions works with streams that carry a type other than a Z-set or
454/// batch.  These functions apply to both streams of deltas and streams of data.
455///
456/// Use [`Stream::apply`] to apply a mapping function to each item in a given
457/// stream.  [`Stream::apply_named`], [`Stream::apply_owned`], and
458/// [`Stream::apply_owned_named`] offer small variations.
459///
460/// Use [`Stream::apply2`] or [`Stream::apply2_owned`] to apply a mapping
461/// function to pairs of items drawn from two different input streams.
462///
463/// ## Addition and subtraction operators
464///
465/// Arithmetic operators work on Z-sets (and batches) by operating on weights.
466/// For example, adding two Z-sets adds the weights of records with the same
467/// key-value pair.  They also work on streams of arithmetic types.
468///
469/// Use [`Stream::neg`] to map a stream to its negation, that is, to negate the
470/// weights for a Z-set.
471///
472/// Use [`Stream::plus`] to add two streams and [`Stream::sum`] to add an
473/// arbitrary number of streams.  Use [`Stream::minus`] to subtract streams.
474///
475/// There aren't any multiplication or division operators, because there is no
476/// clear interpretation of them for Z-sets.  You can use [`Stream::apply`] and
477/// [`Stream::apply2`], as already described, to do arbitrary arithmetic on one
478/// or two streams of arithmetic types.
479///
480/// ## Stream type conversion operators
481///
482/// These operators convert among streams of deltas, streams of data, and
483/// streams of "upserts".  Client code can use them, but they're more often
484/// useful for testing (for example, for checking that incremental operators are
485/// equivalent to non-incremental ones) or for building other operators.
486///
487/// Use [`Stream::integrate`] to sum up the values within an input stream.  The
488/// first output value is the first input value, the second output value is the
489/// sum of the first two inputs, and so on.  This effectively converts a stream
490/// of deltas into a stream of data.
491///
492/// [`Stream::stream_fold`] generalizes integration.  On each step, it calls a
493/// function to fold the input value with an accumulator and outputs the
494/// accumulator.  The client provides the function and the initial value of the
495/// accumulator.
496///
497/// Use [`Stream::differentiate`] to calculate differences between subsequent
498/// values in an input stream.  The first output is the first input value, the
499/// second output is the second input value minus the first input value, and so
500/// on.  This effectively converts a stream of data into a stream of deltas.
501/// You shouldn't ordinarily need this operator, at least not for Z-sets,
502/// because DBSP operators are fully incremental.
503///
504/// Use [`Stream::upsert`] to convert a stream of "upserts" into a stream of
505/// deltas.  The input stream carries "upserts", or assignments of values to
506/// keys such that a subsequent assignment to a key assigned earlier replaces
507/// the earlier value.  `upsert` turns these into a stream of deltas by
508/// internally tracking upserts that have already been seen.
509///
510/// ## Weighting and counting operators
511///
512/// Use [`Stream::dyn_weigh`] to multiply the weights in an indexed Z-set by a
513/// user-provided function of the key and value.  This is equally appropriate
514/// for streams of data or deltas.  This method outputs a non-indexed Z-set with
515/// just the keys from the input, discarding the values, which also means that
516/// weights will be added together in the case of equal input keys.
517///
518/// `Stream` provides methods to count the number of values per key in an
519/// indexed Z-set:
520///
521///   * To sum the weights for each value within a key, use
522///     [`Stream::dyn_weighted_count`] for streams of deltas or
523///     [`Stream::dyn_stream_weighted_count`] for streams of data.
524///
525///   * To count each value only once even for a weight greater than one, use
526///     [`Stream::dyn_distinct_count`] for streams of deltas or
527///     [`Stream::dyn_stream_distinct_count`] for streams of data.
528///
529/// The "distinct" operator on a Z-set maps positive weights to 1 and all other
530/// weights to 0.  `Stream` has two implementations:
531///
532///   * Use [`Stream::distinct`] to incrementally process a stream of deltas. If
533///     the output stream were to be integrated, it only contain records with
534///     weight 0 or 1.  This operator internally integrates the stream of
535///     deltas, which means its memory consumption is proportional to the
536///     integrated data size.
537///
538///   * Use [`Stream::stream_distinct`] to non-incrementally process a stream of
539///     data.  It sets each record's weight to 1 if it is positive and drops the
540///     others.
541///
542/// ## Join on equal keys
543///
544/// A DBSP equi-join takes batches `a` and `b` as input, finds all pairs of a
545/// record in `a` and a record in `b` with the same key, applies a given
546/// function `F` to those records' key and values, and outputs a Z-set with
547/// `F`'s output.
548///
549/// DBSP implements two kinds of joins:
550///
551///   * Joins of delta streams ("incremental" joins) for indexed Z-sets with the
552///     same key type.  Use [`Stream::join`] for non-indexed Z-set output, or
553///     [`Stream::join_index`] for indexed Z-set output.
554///
555///   * Joins of data streams ("nonincremental" joins), which work with any
556///     indexed batches.  Use [`Stream::stream_join`], which outputs a
557///     non-indexed Z-set.
558///
559///     `stream_join` also works for joining a stream of deltas with an
560///     invariant stream of data where the latter is used as a lookup table.
561///
562///     If the output of the join function grows monotonically as `(k, v1, v2)`
563///     tuples are fed to it in lexicographic order, then
564///     [`Stream::monotonic_stream_join`] is more efficient.  One such monotonic
565///     function is a join function that returns `(k, v1, v2)` itself.
566///
567/// One way to implement a Cartesian product is to map unindexed Z-set inputs
568/// into indexed Z-sets with a unit key type, e.g. `input.index_with(|k| ((),
569/// k))`, and then use `join` or `stream_join`, as appropriate.
570///
571/// ## Other kinds of joins
572///
573/// Use [`Stream::antijoin`] for antijoins of delta streams.  It takes indexed
574/// Z-set `a` and Z-set `b` with the same key type and yields the subset of `a`
575/// whose keys do not appear in `b`.  `b` may be indexed or non-indexed and its
576/// value type does not matter.
577///
578/// Use [`Stream::dyn_semijoin_stream`] for semi-joins of data streams.  It
579/// takes a batch `a` and non-indexed batch `b` with the same key type as `a`.
580/// It outputs a non-indexed Z-set of key-value tuples that contains all the
581/// pairs from `a` for which a key appears in `b`.
582///
583/// Use [`Stream::outer_join`] or [`Stream::outer_join_default`] for outer joins
584/// of delta streams.  The former takes three functions, one for each of the
585/// cases (common keys, left key only, right key only), and the latter
586/// simplifies it by taking only a function for common keys and passing in the
587/// default for the missing value.
588///
589/// DBSP implements "range join" of data streams, which joins keys in `a`
590/// against ranges of keys in `b`.  [`Stream::dyn_stream_join_range`] implements
591/// range join with non-indexed Z-set output,
592/// [`Stream::dyn_stream_join_range_index`] with indexed output.
593///
594/// ## Aggregation
595///
596/// Aggregation applies a function (the "aggregation function") to all of the
597/// values for a given key in an input stream, and outputs an indexed Z-set with
598/// the same keys as the input and the function's output as values.  The output
599/// of aggregation usually has fewer records than its input, because it outputs
600/// only one record per input key, regardless of the number of key-value pairs
601/// with that key.
602///
603/// DBSP implements two kinds of aggregation:
604///
605///   * [`Stream::dyn_aggregate`] aggregates delta streams.  It takes an
606///     aggregation function as an [`Aggregator`], e.g. [`Min`], [`Max`],
607///     [`Fold`], or one written by the client.
608///
609///     [`Stream::dyn_aggregate_linear`] is cheaper for linear aggregation
610///     functions.  It's also a little easier to use with a custom aggregation
611///     function, because it only takes a function rather than an [`Aggregator`]
612///     object.
613///
614///     [`Stream::dyn_average`] calculates the average over the values for each
615///     key.
616///
617///   * [`Stream::dyn_stream_aggregate`] aggregates data streams.  Each batch
618///     from the input is separately aggregated and written to the output
619///     stream.
620///
621///     [`Stream::dyn_stream_aggregate_linear`] applies a linear aggregation
622///     function to a data stream.
623///
624/// These aggregation functions all partition the aggregation by key, like GROUP
625/// BY in SQL.  To aggregate all records in a non-indexed Z-set, map to an
626/// indexed Z-set with a unit key `()` before aggregating, then map again to
627/// remove the index if necessary, e.g.:
628///
629/// ```ignore
630/// let max_auction_count = auction_counts
631///     .map_index(|(_auction, count)| ((), *count))
632///     .aggregate(Max)
633///     .map(|((), max_count)| *max_count);
634/// ```
635///
636/// ## Rolling aggregates
637///
638/// DBSP supports rolling aggregation of time series data over a
639/// client-specified "rolling window" range.  For this purpose, Rust unsigned
640/// integer types model times, larger integers corresponding to later times.
641/// The unit of time in use is relevant only for specifying the width of the
642/// aggregation window, with [`RelRange`].
643///
644/// The DBSP logical time concept is unrelated to times used in rolling
645/// aggregation and other time-series operators. The former is used to establish
646/// the ordering in which updates are consumed by DBSP, while the latter model
647/// physical times when the corresponding events were generated, observed, or
648/// processed. In particular, the ordering of physical and logical timestamps
649/// doesn't have to match. In other words, DBSP can process events out-of-order.
650///
651/// Rolling aggregation takes place within a "partition", which is any
652/// convenient division of the data.  It might correspond to a tenant ID, for
653/// example, if each tenant's data is to be separately aggregated.  To represent
654/// partitioning, rolling aggregation introduces the
655/// [`OrdPartitionedIndexedZSet`](`crate::OrdPartitionedIndexedZSet`)
656/// type, which is an `IndexedZSet` with an arbitrary key type that specifies
657/// the partition (it may be `()` if all data is to be within a single
658/// partition) and a value type of the form `(TS, V)` where `TS` is the type
659/// used for time and `V` is the client's value type.
660///
661/// Rolling aggregation does not reduce the size of data.  It outputs one record
662/// for each input record.
663///
664/// DBSP has two kinds of rolling aggregation functions that differ based on
665/// their tolerance for updating aggregation results when new data arrives for
666/// an old moment in time:
667///
668///   * If the application must tolerate data arriving entirely out-of-order,
669///     use [`Stream::partitioned_rolling_aggregate`].  It operates on a
670///     `PartitionedIndexedZSet` and takes an [`Aggregator`] and a [`RelRange`]
671///     that specifies the span of the window.  It returns another
672///     `PartitionedIndexedZSet` with the results.  This operator must buffer
673///     old data indefinitely since old output is always subject to revision.
674///
675///     [`Stream::partitioned_rolling_aggregate_linear`] is cheaper for linear
676///     aggregation functions.
677///
678///     [`Stream::partitioned_rolling_average`] calculates the rolling average
679///     over a partition.
680///
681///   * If the application can discard data that arrives too out-of-order, use
682///     [`Stream::partitioned_rolling_aggregate_with_waterline`], which can be
683///     more memory-efficient.  This form of rolling aggregation requires a
684///     "waterline" stream, which is a stream of times (scalars, not batches or
685///     Z sets) that reports the earliest time that can be updated.  Use
686///     [`Stream::waterline_monotonic`] to conveniently produce the waterline
687///     stream.
688///
689///     [`Stream::partitioned_rolling_aggregate_with_waterline`] operates on an
690///     `IndexedZSet` and, in addition to the aggregrator, range, and waterline
691///     stream, it takes a function to map a record to a partition. It discards
692///     input before the waterline, partitions it, aggregates it, and returns
693///     the result as a `PartitionedIndexedZSet`.
694///
695/// ## Windowing
696///
697/// Use [`Stream::dyn_window`] to extract a stream of deltas to windows from a
698/// stream of deltas.  This can be useful for windowing outside the context of
699/// rolling aggregation.
700pub struct Stream<C, D> {
701    /// Globally unique ID of the stream.
702    stream_id: StreamId,
703    /// Id of the operator within the local circuit that writes to the stream.
704    local_node_id: NodeId,
705    /// Global id of the node that writes to this stream.
706    origin_node_id: GlobalNodeId,
707    /// Circuit that this stream belongs to.
708    circuit: C,
709    /// Value stored in the stream (there can be at most one since our
710    /// circuits are synchronous).
711    val: RefStreamValue<D>,
712}
713
714impl<C, D> StreamMetadata for Stream<C, D>
715where
716    C: Clone + 'static,
717    D: 'static,
718{
719    fn stream_id(&self) -> StreamId {
720        self.stream_id
721    }
722    fn local_node_id(&self) -> NodeId {
723        self.local_node_id
724    }
725    fn origin_node_id(&self) -> &GlobalNodeId {
726        &self.origin_node_id
727    }
728    fn clear_consumer_count(&self) {
729        self.val.get_mut().consumers = 0;
730    }
731    fn num_consumers(&self) -> usize {
732        self.val.get().consumers
733    }
734    fn register_consumer(&self) {
735        self.val.get_mut().consumers += 1;
736    }
737    fn consume_token(&self) {
738        StreamValue::consume_token(self.val());
739    }
740}
741
742impl<C, D> Clone for Stream<C, D>
743where
744    C: Clone,
745{
746    fn clone(&self) -> Self {
747        Self {
748            stream_id: self.stream_id,
749            local_node_id: self.local_node_id,
750            origin_node_id: self.origin_node_id.clone(),
751            circuit: self.circuit.clone(),
752            val: self.val.clone(),
753        }
754    }
755}
756
757impl<C, D> Stream<C, D>
758where
759    C: Clone,
760{
761    /// Transmute a stream of `D` into a stream of `D2`.
762    ///
763    /// This is unsafe and dangerous for the same reasons [`std::mem:transmute`]
764    /// is dangerous and should be used with care.
765    ///
766    /// # Safety
767    ///
768    /// Transmuting `D` into `D2` should be safe.
769    pub(crate) unsafe fn transmute_payload<D2>(&self) -> Stream<C, D2> {
770        unsafe {
771            Stream {
772                stream_id: self.stream_id,
773                local_node_id: self.local_node_id,
774                origin_node_id: self.origin_node_id.clone(),
775                circuit: self.circuit.clone(),
776                val: self.val.transmute::<D2>(),
777            }
778        }
779    }
780}
781
782impl<C, D> Stream<C, D> {
783    /// Returns local node id of the operator or subcircuit that writes to
784    /// this stream.
785    ///
786    /// If the stream originates in a subcircuit, returns the id of the
787    /// subcircuit node.
788    pub fn local_node_id(&self) -> NodeId {
789        self.local_node_id
790    }
791
792    /// Returns global id of the operator that writes to this stream.
793    ///
794    /// If the stream originates in a subcircuit, returns id of the operator
795    /// inside the subcircuit (or one of its subcircuits) that produces the
796    /// contents of the stream.
797    pub fn origin_node_id(&self) -> &GlobalNodeId {
798        &self.origin_node_id
799    }
800
801    pub fn stream_id(&self) -> StreamId {
802        self.stream_id
803    }
804
805    /// Reference to the circuit the stream belongs to.
806    pub fn circuit(&self) -> &C {
807        &self.circuit
808    }
809
810    pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool {
811        self.stream_id() == other.stream_id()
812    }
813}
814
815// Internal streams API only used inside this module.
816impl<C, D> Stream<C, D>
817where
818    C: Circuit,
819{
820    /// Create a new stream within the given circuit, connected to the specified
821    /// node id.
822    pub(crate) fn new(circuit: C, node_id: NodeId) -> Self {
823        Self {
824            stream_id: circuit.allocate_stream_id(),
825            local_node_id: node_id,
826            origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
827            circuit,
828            val: RefStreamValue::empty(),
829        }
830    }
831
832    /// Create a stream out of an existing [`RefStreamValue`] with `node_id` as the source.
833    pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self {
834        Self {
835            stream_id: circuit.allocate_stream_id(),
836            local_node_id: node_id,
837            origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
838            circuit,
839            val,
840        }
841    }
842
843    pub fn value(&self) -> RefStreamValue<D> {
844        self.val.clone()
845    }
846
847    /// Export stream to the parent circuit.
848    ///
849    /// Creates a stream in the parent circuit that contains the last value in
850    /// `self` when the child circuit terminates.
851    ///
852    /// This method currently only works for streams connected to a feedback
853    /// `Z1` operator and will panic for other streams.
854    pub fn export(&self) -> Stream<C::Parent, D>
855    where
856        C::Parent: Circuit,
857        D: 'static,
858    {
859        self.circuit()
860            .cache_get_or_insert_with(ExportId::new(self.stream_id()), || unimplemented!())
861            .clone()
862    }
863
864    /// Call `set_label` on the node that produces this stream.
865    pub fn set_label(&self, key: &str, val: &str) -> Self {
866        self.circuit.set_node_label(&self.origin_node_id, key, val);
867        self.clone()
868    }
869
870    /// Call `get_label` on the node that produces this stream.
871    pub fn get_label(&self, key: &str) -> Option<String> {
872        self.circuit.get_node_label(&self.origin_node_id, key)
873    }
874
875    /// Set persistent id for the operator that produces this stream.
876    pub fn set_persistent_id(&self, name: Option<&str>) -> Self {
877        if let Some(name) = name {
878            self.set_label(LABEL_PERSISTENT_OPERATOR_ID, name)
879        } else {
880            self.clone()
881        }
882    }
883
884    /// Get persistent id for the operator that produces this stream.
885    pub fn get_persistent_id(&self) -> Option<String> {
886        self.get_label(LABEL_PERSISTENT_OPERATOR_ID)
887    }
888}
889
890impl<C, D> Stream<C, D> {
891    /// Create a stream whose origin differs from its node id.
892    fn with_origin(
893        circuit: C,
894        stream_id: StreamId,
895        node_id: NodeId,
896        origin_node_id: GlobalNodeId,
897    ) -> Self {
898        Self {
899            stream_id,
900            local_node_id: node_id,
901            origin_node_id,
902            circuit,
903            val: RefStreamValue::empty(),
904        }
905    }
906}
907
908impl<C, D> Stream<C, D> {
909    pub(crate) fn map_value<T>(&self, f: impl Fn(&D) -> T) -> T {
910        f(StreamValue::peek(&self.get()))
911    }
912
913    fn get(&self) -> Ref<'_, StreamValue<D>> {
914        self.val.get()
915    }
916
917    fn val(&self) -> &RefCell<StreamValue<D>> {
918        &self.val.0
919    }
920
921    /// Puts a value in the stream, overwriting the previous value if any.
922    ///
923    /// # Panics
924    ///
925    /// The caller must have exclusive access to the current stream;
926    /// otherwise the method will panic.
927    pub(crate) fn put(&self, d: D) {
928        self.val.put(d);
929    }
930}
931
932/// Stream whose final value is exported to the parent circuit.
933///
934/// The struct bundles a pair of streams emitted by a
935/// [`StrictOperator`](`crate::circuit::operator_traits::StrictOperator`):
936/// a `local` stream inside operator's local circuit and an
937/// export stream available to the parent of the local circuit.
938/// The export stream contains the final value computed by the
939/// operator before `clock_end`.
940pub struct ExportStream<C, D>
941where
942    C: Circuit,
943{
944    pub local: Stream<C, D>,
945    pub export: Stream<C::Parent, D>,
946}
947
948/// Relative location of a circuit in the hierarchy of nested circuits.
949///
950/// `0` refers to the local circuit that a given node or operator belongs
951/// to, `1` - to the parent of the local, circuit, `2` - to the parent's
952/// parent, etc.
953pub type Scope = u16;
954
955/// Node in a circuit.  A node wraps an operator with strongly typed
956/// input and output streams.
957pub trait Node: Any {
958    /// Node id unique within its parent circuit.
959    fn local_id(&self) -> NodeId;
960
961    /// Global node id.
962    fn global_id(&self) -> &GlobalNodeId;
963
964    /// Persistent node id that remains stable across circuit restarts.  This Id can be used
965    /// to pick up operator state from a checkpoint after restart.
966    ///
967    /// * In ephemeral mode, this id is derived from the global node id. Such an Id can be used
968    ///   to recover the operator after a failure, when the circuit is identical to the one that was
969    ///   running before the failure.
970    ///
971    /// * In persistent mode, this id is derived from the operator's persistent Id assigned to it
972    ///   by the compiler during circuit construction.  This Id will remain stable across circuit
973    ///   restarts even if the circuit changes, as long as all ancestors of the node remain the
974    ///   same. The function will return `None` in persistent mode if the operator
975    ///   does not have a compiler-assigned persistent Id.
976    fn persistent_id(&self) -> Option<String> {
977        let worker_index = Runtime::worker_index();
978
979        match Runtime::mode() {
980            Mode::Ephemeral => Some(format!(
981                "{worker_index}-{}",
982                self.global_id().path_as_string()
983            )),
984            Mode::Persistent => self
985                .get_label(LABEL_PERSISTENT_OPERATOR_ID)
986                .map(|operator_id| format!("{worker_index}-{operator_id}")),
987        }
988    }
989
990    /// Operator name, e.g., "Map", "Join", etc.
991    fn name(&self) -> Cow<'static, str>;
992
993    fn is_circuit(&self) -> bool {
994        false
995    }
996
997    /// Is this an input node?
998    fn is_input(&self) -> bool;
999
1000    /// `true` if the node encapsulates an asynchronous operator (see
1001    /// [`Operator::is_async()`](super::operator_traits::Operator::is_async)).
1002    /// `false` for synchronous operators and subcircuits.
1003    fn is_async(&self) -> bool;
1004
1005    /// `true` if the node is ready to execute (see
1006    /// [`Operator::ready()`](super::operator_traits::Operator::ready)).
1007    /// Always returns `true` for synchronous operators and subcircuits.
1008    fn ready(&self) -> bool;
1009
1010    /// Register callback to be invoked when an asynchronous operator becomes
1011    /// ready (see
1012    /// [`super::operator_traits::Operator::register_ready_callback`]).
1013    fn register_ready_callback(&mut self, _cb: Box<dyn Fn() + Send + Sync>) {}
1014
1015    /// Evaluate the operator.  Reads one value from each input stream
1016    /// and pushes a new value to the output stream (except for sink
1017    /// operators, which don't have an output stream).
1018    fn eval<'a>(
1019        &'a mut self,
1020    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>>;
1021
1022    fn import<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
1023        Box::pin(async {})
1024    }
1025
1026    /// Notify the operator that the circuit is starting a transaction.
1027    fn start_transaction(&mut self);
1028
1029    /// Notify the node about start of a transaction.
1030    /// Call `Operator::flush` on the operator.
1031    fn flush(&mut self);
1032
1033    /// Call `Operator::flush_complete` on the operator.
1034    fn is_flush_complete(&self) -> bool;
1035
1036    /// Notify the node about start of a clock epoch.
1037    ///
1038    /// The node should forward the notification to its inner operator.
1039    ///
1040    /// # Arguments
1041    ///
1042    /// * `scope` - the scope whose clock is restarting. A node gets notified
1043    ///   about clock events in its local circuit (scope 0) and all its
1044    ///   ancestors.
1045    fn clock_start(&mut self, scope: Scope);
1046
1047    /// Notify the node about the end of a clock epoch.
1048    ///
1049    /// The node should forward the notification to its inner operator.
1050    ///
1051    /// # Arguments
1052    ///
1053    /// * `scope` - the scope whose clock is ending.
1054    fn clock_end(&mut self, scope: Scope);
1055
1056    fn init(&mut self) {}
1057
1058    fn metadata(&self, output: &mut OperatorMeta);
1059
1060    fn fixedpoint(&self, scope: Scope) -> bool;
1061
1062    /// Invoke closure on all children of `self`, terminate on the first
1063    /// error.
1064    fn map_nodes_recursive(
1065        &self,
1066        _f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1067    ) -> Result<(), DbspError> {
1068        Ok(())
1069    }
1070
1071    /// Invoke closure on all children of `self`, terminate on the first
1072    /// error.
1073    fn map_nodes_recursive_mut(
1074        &self,
1075        _f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1076    ) -> Result<(), DbspError> {
1077        Ok(())
1078    }
1079
1080    /// Instructs the node to write the state of its inner operator to
1081    /// persistent storage within directory `base`.
1082    ///
1083    /// The node shouldn't commit the state to stable storage; rather, it should
1084    /// append the files to be committed to `files` for later commit.
1085    fn checkpoint(
1086        &mut self,
1087        base: &StoragePath,
1088        files: &mut Vec<Arc<dyn FileCommitter>>,
1089    ) -> Result<(), DbspError>;
1090
1091    /// Instructs the node to restore the state of its inner operator to
1092    /// the given checkpoint in directory `base`.
1093    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError>;
1094
1095    /// Reset the state of the operator to default.
1096    ///
1097    /// Used during replay, when the replay algorithm determines
1098    /// that the operator that may have previously picked up its state
1099    /// from a checkpoint must be backfilled from clean state.
1100    fn clear_state(&mut self) -> Result<(), DbspError>;
1101
1102    /// Call [`Operator::start_compaction`](super::operator_traits::Operator::start_compaction) on the operator this node encapsulates.
1103    fn start_compaction(&mut self);
1104
1105    /// Place operator in the replay mode.
1106    ///
1107    /// In the replay mode the operator streams its stored state to a temporary
1108    /// replay stream.
1109    ///
1110    /// # Panics
1111    ///
1112    /// Panics for operators that don't support replay.
1113    fn start_replay(&mut self) -> Result<(), DbspError>;
1114
1115    /// Check if the operator has finished replaying its stored state.
1116    ///
1117    /// # Panics
1118    ///
1119    /// Panics for operators that don't support replay.
1120    fn is_replay_complete(&self) -> bool;
1121
1122    /// Notify the operator that the circuit is exiting the replay mode.
1123    ///
1124    /// The operator can cleanup any state needed for replay at this point.
1125    ///
1126    /// # Panics
1127    ///
1128    /// Panics for operators that don't support replay.
1129    fn end_replay(&mut self) -> Result<(), DbspError>;
1130
1131    /// Takes a fingerprint of the node's inner operator adds it to `fip`.
1132    fn fingerprint(&self, fip: &mut Fingerprinter) {
1133        fip.hash(type_name_of_val(self));
1134    }
1135
1136    /// Tag the node with a text label.
1137    fn set_label(&mut self, key: &str, value: &str);
1138
1139    /// Get the label associated with the given key.
1140    fn get_label(&self, key: &str) -> Option<&str>;
1141
1142    fn labels(&self) -> &BTreeMap<String, String>;
1143
1144    /// Apply closure to a child node of `self`.
1145    fn map_child(&self, _path: &[NodeId], _f: &mut dyn FnMut(&dyn Node)) {
1146        panic!("map_child: not a circuit node")
1147    }
1148
1149    /// Apply closure to a child node of `self`.
1150    fn map_child_mut(&self, _path: &[NodeId], _f: &mut dyn FnMut(&mut dyn Node)) {
1151        panic!("map_child_mut: not a circuit node")
1152    }
1153
1154    fn as_circuit(&self) -> Option<&dyn CircuitBase> {
1155        None
1156    }
1157
1158    fn as_any(&self) -> &dyn Any;
1159}
1160
1161/// Globally unique id of a stream.
1162#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
1163#[repr(transparent)]
1164pub struct StreamId(usize);
1165
1166impl StreamId {
1167    pub fn new(id: usize) -> Self {
1168        Self(id)
1169    }
1170
1171    /// Extracts numeric representation of the stream id.
1172    pub fn id(&self) -> usize {
1173        self.0
1174    }
1175}
1176
1177impl Display for StreamId {
1178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1179        f.write_char('s')?;
1180        Debug::fmt(&self.0, f)
1181    }
1182}
1183
1184/// Id of an operator, guaranteed to be unique within a circuit.
1185#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1186#[repr(transparent)]
1187pub struct NodeId(usize);
1188
1189impl NodeId {
1190    pub fn new(id: usize) -> Self {
1191        Self(id)
1192    }
1193
1194    /// Extracts numeric representation of the node id.
1195    pub fn id(&self) -> usize {
1196        self.0
1197    }
1198
1199    pub(super) fn root() -> Self {
1200        Self(0)
1201    }
1202}
1203
1204impl Display for NodeId {
1205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1206        f.write_char('n')?;
1207        Debug::fmt(&self.0, f)
1208    }
1209}
1210
1211/// Identifies a circuit region by name and a numeric ID; each pair is supposed to be unique.
1212#[derive(Debug, Clone, Eq, Hash, PartialEq)]
1213pub struct RegionName {
1214    /// ID for a region used to distinguish multiple regions with the same name.
1215    id: u64,
1216    pub name: Cow<'static, str>,
1217}
1218
1219impl RegionName {
1220    fn new(name: &str, id: u64) -> Self {
1221        Self {
1222            id,
1223            name: Cow::Owned(name.to_string()),
1224        }
1225    }
1226
1227    /// Returns the identifier for this region.
1228    pub fn id(&self) -> u64 {
1229        self.id
1230    }
1231
1232    /// Returns the name of this region.
1233    pub fn name(&self) -> &str {
1234        &self.name
1235    }
1236}
1237
1238impl Display for RegionName {
1239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1240        f.write_str(self.name.as_ref())?;
1241        f.write_char(':')?;
1242        write!(f, "{}", self.id)
1243    }
1244}
1245
1246/// Globally unique id of a node (operator or subcircuit).
1247///
1248/// The identifier consists of a path from the top-level circuit to the node.
1249/// The top-level circuit has global id `[]`, an operator in the top-level
1250/// circuit or a sub-circuit nested inside the top-level circuit will have a
1251/// path of length 1, e.g., `[5]`, an operator inside the nested circuit
1252/// will have a path of length 2, e.g., `[5, 1]`, etc.
1253#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1254#[repr(transparent)]
1255pub struct GlobalNodeId(Vec<NodeId>);
1256
1257impl Serialize for GlobalNodeId {
1258    /// Serialize as a string containing all node ids
1259    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1260    where
1261        S: Serializer,
1262    {
1263        let s = self.node_identifier().to_string();
1264        serializer.serialize_str(&s)
1265    }
1266}
1267
1268impl Display for GlobalNodeId {
1269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1270        f.write_str("[")?;
1271        let path = self.path();
1272        for i in 0..path.len() {
1273            f.write_str(&path[i].0.to_string())?;
1274            if i < path.len() - 1 {
1275                f.write_str(".")?;
1276            }
1277        }
1278        f.write_str("]")
1279    }
1280}
1281
1282impl GlobalNodeId {
1283    /// Generate global node id from path.
1284    pub fn from_path(path: &[NodeId]) -> Self {
1285        Self(path.to_owned())
1286    }
1287
1288    /// Generate global node id from path.
1289    pub fn from_path_vec(path: Vec<NodeId>) -> Self {
1290        Self(path)
1291    }
1292
1293    pub fn root() -> Self {
1294        Self(Vec::new())
1295    }
1296
1297    /// Generate global node id by appending `child_id` to `self`.
1298    pub fn child(&self, child_id: NodeId) -> Self {
1299        let mut path = Vec::with_capacity(self.path().len() + 1);
1300        for id in self.path() {
1301            path.push(*id);
1302        }
1303        path.push(child_id);
1304        Self(path)
1305    }
1306
1307    /// Generate global node id for a child node of `circuit`.
1308    pub fn child_of<C>(circuit: &C, node_id: NodeId) -> Self
1309    where
1310        C: Circuit,
1311    {
1312        let mut ids = circuit.global_node_id().path().to_owned();
1313        ids.push(node_id);
1314        Self(ids)
1315    }
1316
1317    /// Generate unique name to use as a node label in a visual graph.
1318    pub fn node_identifier(&self) -> impl Display {
1319        struct NodeIdentifier<'a>(&'a GlobalNodeId);
1320        impl<'a> Display for NodeIdentifier<'a> {
1321            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1322                write!(f, "n{}", self.0.path().iter().format("_"))
1323            }
1324        }
1325        NodeIdentifier(self)
1326    }
1327
1328    /// Returns local node id of `self` or `None` if `self` is the root node.
1329    pub fn local_node_id(&self) -> Option<NodeId> {
1330        self.0.last().cloned()
1331    }
1332
1333    /// Returns parent id of `self` or `None` if `self` is the root node.
1334    pub fn parent_id(&self) -> Option<Self> {
1335        self.0
1336            .split_last()
1337            .map(|(_, prefix)| GlobalNodeId::from_path(prefix))
1338    }
1339
1340    /// Returns `true` if `self` is a child of `parent`.
1341    pub fn is_child_of(&self, parent: &Self) -> bool {
1342        self.parent_id().as_ref() == Some(parent)
1343    }
1344
1345    /// Get the path from global.
1346    pub fn path(&self) -> &[NodeId] {
1347        &self.0
1348    }
1349
1350    /// Ancestor of `self` in the top-level circuit.
1351    ///
1352    /// For a top-level node, its node id. For a nested node,
1353    /// returns the node id of the ancestor whose parent is the root node.
1354    ///
1355    /// # Panic
1356    ///
1357    /// Panics if `self` is the root node.
1358    pub fn top_level_ancestor(&self) -> NodeId {
1359        self.0[0]
1360    }
1361
1362    /// Convert to string in the `x-y-z` format.
1363    pub(crate) fn path_as_string(&self) -> String {
1364        self.0
1365            .iter()
1366            .map(|node_id| node_id.0.to_string())
1367            .collect::<Vec<_>>()
1368            .join("-")
1369    }
1370
1371    /// Format global node id as LIR node id.
1372    pub fn lir_node_id(&self) -> LirNodeId {
1373        LirNodeId::new(&self.path_as_string())
1374    }
1375}
1376
1377type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
1378type SchedulerEventHandler = Box<dyn FnMut(&SchedulerEvent<'_>)>;
1379type CircuitEventHandlers = Rc<RefCell<HashMap<String, CircuitEventHandler>>>;
1380type SchedulerEventHandlers = Rc<RefCell<HashMap<String, SchedulerEventHandler>>>;
1381
1382/// Operator's preference to consume input data by value.
1383///
1384/// # Background
1385///
1386/// A stream in a circuit can be connected to multiple consumers.  It is
1387/// therefore generally impossible to provide each consumer with an owned copy
1388/// of the data without cloning it.  At the same time, many operators can be
1389/// more efficient when working with owned inputs.  For instance, when computing
1390/// a sum of two z-sets, if one of the input z-sets is owned we can just add
1391/// values from the other z-set to it without cloning the first z-set.  If both
1392/// inputs are owned then we additionally do not need to clone key/value pairs
1393/// when inserting them.  Furthermore, the implementation can choose to add the
1394/// contents of the smaller z-set to the larger one.
1395///
1396/// # Ownership-aware scheduling
1397///
1398/// To leverage such optimizations, we adopt the best-effort approach: operators
1399/// consume streaming data by-value when possible while falling back to
1400/// pass-by-reference otherwise.  In a synchronous circuit, each operator reads
1401/// its input stream precisely once in each clock cycle. It is therefore
1402/// possible to determine the last consumer at each clock cycle and give it the
1403/// owned value from the channel.  It is furthermore possible for the scheduler
1404/// to schedule operators that strongly prefer owned values last.
1405///
1406/// We capture ownership preferences at two levels.  First, each individual
1407/// operator that consumes one or more streams exposes its preferences on a
1408/// per-stream basis via an API method (e.g.,
1409/// [`UnaryOperator::input_preference`]).  The [`Circuit`] API allows the
1410/// circuit builder to override these preferences when instantiating an
1411/// operator, taking into account circuit topology and workload.  We express
1412/// preference as a numeric value.
1413///
1414/// These preferences are associated with each edge in the circuit graph.  The
1415/// schedulers we have built so far implement a limited form of ownership-aware
1416/// scheduling.  They only consider strong preferences
1417/// ([`OwnershipPreference::STRONGLY_PREFER_OWNED`] and stronger) and model them
1418/// internally as hard constraints that must be satisfied for the circuit to be
1419/// schedulable.  Weaker preferences are ignored.
1420#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
1421#[repr(transparent)]
1422pub struct OwnershipPreference(usize);
1423
1424impl OwnershipPreference {
1425    /// Create a new instance with given numeric preference value (higher
1426    /// value means stronger preference).
1427    pub const fn new(val: usize) -> Self {
1428        Self(val)
1429    }
1430
1431    /// The operator does not gain any speed up from consuming an owned value.
1432    pub const INDIFFERENT: Self = Self::new(0);
1433
1434    /// The operator is likely to run faster provided an owned input, but
1435    /// shouldn't be prioritized over more impactful operators
1436    ///
1437    /// This gives a lower priority than [`Self::PREFER_OWNED`] so that
1438    /// operators who need ownership can get it when available
1439    pub const WEAKLY_PREFER_OWNED: Self = Self::new(40);
1440
1441    /// The operator is likely to run faster provided an owned input.
1442    ///
1443    /// Preference levels above `PREFER_OWNED` should not be used by operators
1444    /// and are reserved for use by circuit builders through the [`Circuit`]
1445    /// API.
1446    pub const PREFER_OWNED: Self = Self::new(50);
1447
1448    /// The circuit will suffer a significant performance hit if the operator
1449    /// cannot consume data in the channel by-value.
1450    pub const STRONGLY_PREFER_OWNED: Self = Self::new(100);
1451
1452    /// Returns the numeric value of the preference.
1453    pub const fn raw(&self) -> usize {
1454        self.0
1455    }
1456}
1457
1458impl Default for OwnershipPreference {
1459    #[inline]
1460    fn default() -> Self {
1461        Self::INDIFFERENT
1462    }
1463}
1464
1465impl Display for OwnershipPreference {
1466    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1467        match *self {
1468            Self::INDIFFERENT => f.write_str("Indifferent"),
1469            Self::WEAKLY_PREFER_OWNED => f.write_str("WeaklyPreferOwned"),
1470            Self::PREFER_OWNED => f.write_str("PreferOwned"),
1471            Self::STRONGLY_PREFER_OWNED => f.write_str("StronglyPreferOwned"),
1472            Self(preference) => write!(f, "Preference({preference})"),
1473        }
1474    }
1475}
1476
1477/// An edge in a circuit graph represents a stream connecting two
1478/// operators or a dependency (i.e., a requirement that one operator
1479/// must be evaluated before the other even if they are not connected
1480/// by a stream).
1481#[derive(Clone)]
1482pub struct Edge {
1483    /// Source node.
1484    pub from: NodeId,
1485    /// Destination node.
1486    pub to: NodeId,
1487    /// Origin node that generates the stream.  If the origin belongs
1488    /// to the local circuit, this is just the full path to the `from`
1489    /// node.
1490    pub origin: GlobalNodeId,
1491    /// Stream associated with the edge, if any.  If `None`, this is a
1492    /// dependency edge.
1493    pub stream: Option<Box<dyn StreamMetadata>>,
1494    /// Ownership preference associated with the consumer of this
1495    /// stream or `None` if this is a dependency edge.
1496    pub ownership_preference: Option<OwnershipPreference>,
1497}
1498
1499#[allow(dead_code)]
1500impl Edge {
1501    /// `true` if `self` is a dependency edge.
1502    pub(crate) fn is_dependency(&self) -> bool {
1503        self.ownership_preference.is_none()
1504    }
1505
1506    /// `true` if `self` is a stream edge.
1507    pub(crate) fn is_stream(&self) -> bool {
1508        self.stream.is_some()
1509    }
1510
1511    pub(crate) fn stream_id(&self) -> Option<StreamId> {
1512        self.stream.as_ref().map(|meta| meta.stream_id())
1513    }
1514}
1515
1516circuit_cache_key!(ExportId<C, D>(StreamId => Stream<C, D>));
1517
1518// `stream` => `replay_stream` mapping designates `replay_stream`
1519// as a replay source for `stream`.
1520circuit_cache_key!(ReplaySource(StreamId => Box<dyn StreamMetadata>));
1521
1522/// Register `replay_stream` as a replay source for `stream`.
1523pub(crate) fn register_replay_stream<C, B>(
1524    circuit: &C,
1525    stream: &Stream<C, B>,
1526    replay_stream: &Stream<C, B>,
1527) where
1528    C: Circuit,
1529    B: 'static,
1530{
1531    // We currently only support using operators in the top-level circuit
1532    // as replay sources.
1533    if TypeId::of::<()>() == TypeId::of::<C::Time>() {
1534        // If a replay source already exists, don't overwrite it. This normally shouldn't
1535        // happen as we should not have more than one integral for each stream. One situation
1536        // where this does happen today is for input streams that have an integral without
1537        // an accumulator as part of input_upsert, and another integral with an accumulator
1538        // created by a downstream join or aggregate. In this case, we want to use the former
1539        // for replay, as the latter may have been added in the new version of the program
1540        // and may be empty, while the former can have state (conversely, if the input integral
1541        // is empty, the downstream integral is guaranteed to be empty too).
1542        if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) {
1543            circuit.cache_insert(
1544                ReplaySource::new(stream.stream_id()),
1545                Box::new(replay_stream.clone()),
1546            );
1547        }
1548    }
1549}
1550
1551/// Trait for an object that has a clock associated with it.
1552/// This is implemented trivially for root circuits.
1553pub trait WithClock {
1554    /// `()` for a trivial zero-dimensional clock that doesn't need to count
1555    /// ticks.
1556    type Time: Timestamp;
1557
1558    /// Current time.
1559    fn time(&self) -> Self::Time;
1560}
1561
1562/// This `impl` is only needed to bootstrap the
1563/// recursive definition of `WithClock` for `ChildCircuit`.
1564/// It is never actually used at runtime.
1565impl WithClock for () {
1566    type Time = UnitTimestamp;
1567
1568    fn time(&self) -> Self::Time {
1569        UnitTimestamp
1570    }
1571}
1572
1573impl<P, T> WithClock for ChildCircuit<P, T>
1574where
1575    P: 'static,
1576    T: Timestamp,
1577{
1578    type Time = T;
1579
1580    fn time(&self) -> Self::Time {
1581        self.time.borrow().clone()
1582    }
1583}
1584
1585// TODO: use a better type than serde_json::Value.
1586#[derive(Default, Debug, Clone, Serialize, Deserialize)]
1587pub struct CircuitMetadata {
1588    metadata: HashMap<NodeId, serde_json::Value>,
1589}
1590
1591#[derive(Default, Debug)]
1592pub struct MetadataExchangeInner {
1593    /// Metadata registered by operators running in the current worker.
1594    local_metadata: RefCell<CircuitMetadata>,
1595
1596    /// Metadata received from peers. All workers have identical metadata snapshots during a step.
1597    /// This is a vector of metadata snapshots from all workers, one for each worker.
1598    global_metadata: RefCell<Vec<CircuitMetadata>>,
1599}
1600
1601/// Metadata exchange.
1602///
1603/// Allows the worker to exchange arbitrary semi-structured data with its peers.
1604///
1605/// Every operator in the circuit can update its local metadata.
1606/// Before every step, the circuit broadcasts its metadata to all other workers
1607/// and receives their metadata. As a result all workers have identical metadata
1608/// snapshots during the step and can make deterministic decisions based on it, such
1609/// as choosing a balancing policy for a stream.
1610#[derive(Default, Debug, Clone)]
1611pub struct MetadataExchange {
1612    inner: Rc<MetadataExchangeInner>,
1613}
1614
1615impl MetadataExchange {
1616    fn new() -> Self {
1617        Self::default()
1618    }
1619
1620    /// Get the current snapshot of the local metadata registered by operators running in the current worker.
1621    pub fn local_metadata(&self) -> CircuitMetadata {
1622        self.inner.local_metadata.borrow().clone()
1623    }
1624
1625    /// Update the local metadata for the operator with the given id.
1626    pub fn set_local_operator_metadata(&self, id: NodeId, metadata: serde_json::Value) {
1627        self.inner
1628            .local_metadata
1629            .borrow_mut()
1630            .metadata
1631            .insert(id, metadata.clone());
1632    }
1633
1634    /// Clear the local metadata for the operator with the given id.
1635    pub fn clear_local_operator_metadata(&self, id: NodeId) {
1636        self.inner.local_metadata.borrow_mut().metadata.remove(&id);
1637    }
1638
1639    /// Update the local metadata for the operator with the given id by serializing `metadata` to a JSON value.
1640    pub fn set_local_operator_metadata_typed<T>(&self, id: NodeId, metadata: T)
1641    where
1642        T: Serialize,
1643    {
1644        self.inner
1645            .local_metadata
1646            .borrow_mut()
1647            .metadata
1648            .insert(id, serde_json::to_value(metadata).unwrap());
1649    }
1650
1651    /// Get the current snapshot of the local metadata for the operator with the given id.
1652    pub fn get_local_operator_metadata(&self, id: NodeId) -> Option<serde_json::Value> {
1653        self.inner
1654            .local_metadata
1655            .borrow()
1656            .metadata
1657            .get(&id)
1658            .cloned()
1659    }
1660
1661    pub fn get_local_operator_metadata_typed<T>(&self, id: NodeId) -> Option<T>
1662    where
1663        T: DeserializeOwned,
1664    {
1665        self.get_local_operator_metadata(id)
1666            .map(|val| serde_json::from_value::<T>(val).unwrap())
1667    }
1668
1669    /// Set the global metadata received from peers (invoked by the scheduler).
1670    pub fn set_global_metadata(&self, global_metadata: Vec<CircuitMetadata>) {
1671        *self.inner.global_metadata.borrow_mut() = global_metadata;
1672    }
1673
1674    pub fn get_global_metadata(&self) -> Vec<CircuitMetadata> {
1675        self.inner.global_metadata.borrow().clone()
1676    }
1677
1678    /// Get metadata for the operator with the given id received from all workers before the current step.
1679    pub fn get_global_operator_metadata(&self, id: NodeId) -> Vec<Option<serde_json::Value>> {
1680        self.inner
1681            .global_metadata
1682            .borrow()
1683            .iter()
1684            .map(|global_metadata| global_metadata.metadata.get(&id).cloned())
1685            .collect()
1686    }
1687
1688    /// Get metadata for the operator with the given id received from all workers before the current step.
1689    /// Deserialize it from a JSON value to a strongly typed representation `T`.
1690    ///
1691    /// # Panic
1692    ///
1693    /// Panics if the JSON value cannot be deserialized to a strongly typed representation `T`.
1694    pub fn get_global_operator_metadata_typed<T>(&self, id: NodeId) -> Vec<Option<T>>
1695    where
1696        T: DeserializeOwned,
1697    {
1698        self.inner
1699            .global_metadata
1700            .borrow()
1701            .iter()
1702            .map(|global_metadata| {
1703                global_metadata
1704                    .metadata
1705                    .get(&id)
1706                    .cloned()
1707                    .map(|val| serde_json::from_value::<T>(val).unwrap())
1708            })
1709            .collect()
1710    }
1711}
1712
1713/// An object-safe subset of the circuit API.
1714pub trait CircuitBase: 'static {
1715    fn edges(&self) -> Ref<'_, Edges>;
1716
1717    fn edges_mut(&self) -> RefMut<'_, Edges>;
1718
1719    /// Global id of the circuit node.
1720    ///
1721    /// Returns [`GlobalNodeId::root()`] for the root circuit.
1722    fn global_id(&self) -> &GlobalNodeId;
1723
1724    /// Number of nodes in the circuit.
1725    fn num_nodes(&self) -> usize;
1726
1727    /// Returns vector of local node ids in the circuit.
1728    fn node_ids(&self) -> Vec<NodeId>;
1729
1730    fn lookup_local_node_by_persistent_id(
1731        &self,
1732        persistent_id: &str,
1733    ) -> Result<GlobalNodeId, DbspError>;
1734
1735    fn import_nodes(&self) -> Vec<NodeId>;
1736
1737    fn clear(&mut self);
1738
1739    /// Register a dependency between `from` and `to` nodes.  A dependency tells
1740    /// the scheduler that `from` must be evaluated before `to` in each
1741    /// clock cycle even though there may not be an edge or a path
1742    /// connecting them.
1743    fn add_dependency(&self, from: NodeId, to: NodeId);
1744
1745    /// The set of transitive ancestors for each node in the circuit,
1746    /// i.e., the set of all nodes that precede it (transitively) in the `Edges` relationship.
1747    fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>>;
1748
1749    /// Allocate a new globally unique stream id.  This method can be invoked on any circuit in the pipeline,
1750    /// since all of them maintain a shared global counter.
1751    fn allocate_stream_id(&self) -> StreamId;
1752
1753    /// Reference to the global counter shared by all circuits.
1754    fn last_stream_id(&self) -> RefCell<StreamId>;
1755
1756    /// Relative depth of `self` from the root circuit.
1757    ///
1758    /// Returns 0 if `self` is the root circuit, 1 if `self` is an immediate
1759    /// child of the root circuit, etc.
1760    fn root_scope(&self) -> Scope;
1761
1762    /// Circuit's node id within the parent circuit.
1763    fn node_id(&self) -> NodeId;
1764
1765    /// Circuit's global node id.
1766    fn global_node_id(&self) -> GlobalNodeId;
1767
1768    /// Apply `f` to the node with the specified `path` relative to `self`.
1769    fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node));
1770
1771    /// Apply `f` to the node with the specified `path` relative to `self`.
1772    fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node));
1773
1774    /// Recursively apply `f` to all nodes in `self` and its children.
1775    ///
1776    /// Stop at the first error.
1777    fn map_nodes_recursive(
1778        &self,
1779        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1780    ) -> Result<(), DbspError>;
1781
1782    /// Recursively apply `f` to all nodes in `self` and its children mutably.
1783    ///
1784    /// Stop at the first error.
1785    fn map_nodes_recursive_mut(
1786        &mut self,
1787        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1788    ) -> Result<(), DbspError>;
1789
1790    /// Apply `f` to all immediate children of `self`.
1791    ///
1792    /// Stop at the first error.
1793    fn map_local_nodes(
1794        &self,
1795        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1796    ) -> Result<(), DbspError>;
1797
1798    /// Apply `f` to all immediate children of `self`.
1799    fn map_local_nodes_mut(
1800        &self,
1801        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1802    ) -> Result<(), DbspError>;
1803
1804    /// Apply closure `f` to a node with specified node id.
1805    ///
1806    /// # Panic
1807    ///
1808    /// Panics if `id` is not a valid Id of a node in `self`.
1809    fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node));
1810
1811    /// Apply `f` to all immediate subcircuits of `self`.
1812    ///
1813    /// Stop at the first error.
1814    fn map_subcircuits(
1815        &self,
1816        f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
1817    ) -> Result<(), DbspError>;
1818
1819    /// Tag the node with a text label.
1820    ///
1821    /// # Panic
1822    ///
1823    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1824    /// if there is another mutable or immutable reference to the node.
1825    fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str);
1826
1827    fn set_persistent_node_id(&self, id: &GlobalNodeId, persistent_id: Option<&str>) {
1828        if let Some(persistent_id) = persistent_id {
1829            self.set_node_label(id, LABEL_PERSISTENT_OPERATOR_ID, persistent_id);
1830        }
1831    }
1832
1833    fn set_mir_node_id(&self, id: &GlobalNodeId, mir_id: Option<&str>) {
1834        if let Some(mir_id) = mir_id {
1835            self.set_node_label(id, LABEL_MIR_NODE_ID, mir_id);
1836        }
1837    }
1838
1839    /// Get node label.
1840    ///
1841    /// # Panic
1842    ///
1843    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1844    /// if there is another mutable or immutable reference to the node.
1845    fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String>;
1846
1847    /// Node label for persistent operator id.
1848    fn get_persistent_node_id(&self, id: &GlobalNodeId) -> Option<String> {
1849        self.get_node_label(id, LABEL_PERSISTENT_OPERATOR_ID)
1850    }
1851
1852    fn check_fixedpoint(&self, scope: Scope) -> bool;
1853
1854    /// Return the metadata exchange object associated with the circuit.
1855    fn metadata_exchange(&self) -> &MetadataExchange;
1856
1857    /// Return the balancer object associated with the circuit.
1858    fn balancer(&self) -> &Balancer;
1859
1860    /// Set the auto-rebalancing flag for the circuit.
1861    fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError>;
1862
1863    /// Set the balancer hint for the operator with the given global node id.
1864    ///
1865    /// # Errors
1866    ///
1867    /// Returns an error if the operator with the given global node id is not found
1868    /// or if the hint contradicts the current balancer policy.
1869    fn set_balancer_hint_by_global_id(
1870        &self,
1871        global_node_id: &GlobalNodeId,
1872        hint: BalancerHint,
1873    ) -> Result<(), DbspError>;
1874
1875    fn set_balancer_hint(&self, persistent_id: &str, hint: BalancerHint) -> Result<(), DbspError>;
1876
1877    /// Get the current balancer policy for all streams managed by the balancer.
1878    fn get_current_balancer_policies(&self) -> BTreeMap<NodeId, PartitioningPolicy>;
1879
1880    fn get_current_balancer_policy(
1881        &self,
1882        persistent_id: &str,
1883    ) -> Result<PartitioningPolicy, DbspError>;
1884
1885    fn rebalance(&self);
1886
1887    fn start_compaction(&self);
1888}
1889
1890/// The circuit interface.  All DBSP computation takes place within a circuit.
1891///
1892/// Circuits can nest.  The nesting hierarchy must be known statically at
1893/// compile time via the `Parent` associated type, which must be `()` for a root
1894/// circuit and otherwise the parent circuit's type.
1895///
1896/// A circuit has a clock represented by the `Time` associated type obtained via
1897/// the `WithClock` supertrait.  For a root circuit, this is a trivial
1898/// zero-dimensional clock that doesn't need to count ticks.
1899///
1900/// There is only one implementation, [`ChildCircuit<P>`], whose `Parent` type
1901/// is `P`.  [`RootCircuit`] is a synonym for `ChildCircuit<()>`.
1902pub trait Circuit: CircuitBase + Clone + WithClock {
1903    /// Parent circuit type or `()` for the root circuit.
1904    type Parent;
1905
1906    /// Returns the parent circuit of `self`.
1907    fn parent(&self) -> Self::Parent;
1908
1909    /// Return the root of the circuit tree.
1910    fn root_circuit(&self) -> RootCircuit;
1911
1912    /// Check if `this` and `other` refer to the same circuit instance.
1913    fn ptr_eq(this: &Self, other: &Self) -> bool;
1914
1915    /// Returns circuit event handlers attached to the circuit.
1916    fn circuit_event_handlers(&self) -> CircuitEventHandlers;
1917
1918    /// Returns scheduler event handlers attached to the circuit.
1919    fn scheduler_event_handlers(&self) -> SchedulerEventHandlers;
1920
1921    /// Deliver `event` to all circuit event handlers.
1922    fn log_circuit_event(&self, event: &CircuitEvent);
1923
1924    /// Deliver `event` to all scheduler event handlers.
1925    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>);
1926
1927    /// Apply closure `f` to a node with specified global id.
1928    ///
1929    /// # Panic
1930    ///
1931    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1932    /// if there is another mutable reference to the node.
1933    fn map_node<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> T) -> T;
1934
1935    /// Apply closure `f` to a node with specified global id.
1936    ///
1937    /// # Panic
1938    ///
1939    /// Panics if `id` is not a valid Id of a node in `self` or one of its children or
1940    /// if there is another mutable or immutable reference to the node.
1941    fn map_node_mut<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1942
1943    /// Apply closure `f` to a node with specified node id.
1944    ///
1945    /// # Panic
1946    ///
1947    /// Panics if `id` is not a valid Id of a node in `self`.
1948    fn map_local_node_mut<T>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1949
1950    /// Lookup a value in the circuit cache or create and insert a new value
1951    /// if it does not exist.
1952    ///
1953    /// See [`cache`](`crate::circuit::cache`) module documentation for details.
1954    fn cache_get_or_insert_with<K, F>(&self, key: K, f: F) -> RefMut<'_, K::Value>
1955    where
1956        K: 'static + TypedMapKey<CircuitStoreMarker>,
1957        F: FnMut() -> K::Value;
1958
1959    /// Invoked by the scheduler at the end of a clock cycle, after all circuit
1960    /// operators have been evaluated.
1961    fn tick(&self);
1962
1963    /// Deliver `clock_start` notification to all nodes in the circuit.
1964    fn clock_start(&self, scope: Scope);
1965
1966    /// Deliver `clock_end` notification to all nodes in the circuit.
1967    fn clock_end(&self, scope: Scope);
1968
1969    /// `true` if the specified node is ready to execute (see
1970    /// [`Operator::ready()`](super::operator_traits::Operator::ready)).
1971    fn ready(&self, id: NodeId) -> bool;
1972
1973    /// Insert a value to the circuit cache, overwriting any existing value.
1974    ///
1975    /// See [`cache`](`crate::circuit::cache`) module documentation for
1976    /// details.
1977    fn cache_insert<K>(&self, key: K, val: K::Value)
1978    where
1979        K: TypedMapKey<CircuitStoreMarker> + 'static;
1980
1981    fn cache_contains<K>(&self, key: &K) -> bool
1982    where
1983        K: TypedMapKey<CircuitStoreMarker> + 'static;
1984
1985    fn cache_get<K>(&self, key: &K) -> Option<K::Value>
1986    where
1987        K: TypedMapKey<CircuitStoreMarker> + 'static,
1988        K::Value: Clone;
1989
1990    /// Check if a stream can be replayed, if so, return the replay stream that can be used to
1991    /// replay the contents of `stream_id`.
1992    fn get_replay_source(&self, stream_id: StreamId) -> Option<Box<dyn StreamMetadata>> {
1993        self.cache_get(&ReplaySource::new(stream_id))
1994    }
1995
1996    /// For every edge in `self` with stream id equal to `stream_id`, create an additional replay edge with
1997    /// the same destination node attached to `replay_stream`.
1998    fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata);
1999
2000    /// Connect `stream` as input to `to`.
2001    fn connect_stream<T: 'static>(
2002        &self,
2003        stream: &Stream<Self, T>,
2004        to: NodeId,
2005        ownership_preference: OwnershipPreference,
2006    );
2007
2008    fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>);
2009
2010    fn is_async_node(&self, id: NodeId) -> bool;
2011
2012    /// Evaluate operator with the given id.
2013    ///
2014    /// This method should only be used by schedulers.
2015    fn eval_node(
2016        &self,
2017        id: NodeId,
2018    ) -> impl Future<Output = Result<Option<Position>, SchedulerError>>;
2019
2020    /// Evaluate import node to pull inputs from the parent circuit.
2021    fn eval_import_node(&self, id: NodeId) -> impl Future<Output = ()>;
2022
2023    fn flush_node(&self, id: NodeId);
2024
2025    fn is_flush_complete(&self, id: NodeId) -> bool;
2026
2027    /// Evaluate closure `f` inside a new circuit region.
2028    ///
2029    /// A region is a logical grouping of circuit nodes.  Regions are used
2030    /// exclusively for debugging and do not affect scheduling or evaluation
2031    /// of the circuit.  This function creates a new region and executes
2032    /// closure `f` inside it.  Any operators or subcircuits created by
2033    /// `f` will belong to the new region.
2034    #[track_caller]
2035    fn region<F, T>(&self, name: &str, f: F) -> T
2036    where
2037        F: FnOnce() -> T;
2038
2039    /// Create a named region identifier for incremental region construction.
2040    /// 'name' is the name displayed for the region.
2041    /// 'id' is an identifier which distinguishes between regions with the same name.
2042    ///
2043    /// Returns a [`RegionName`] that can be passed to [`Circuit::open_region`]
2044    /// and [`Circuit::close_region`] to associate operators with the region
2045    /// from multiple call sites.
2046    fn create_region_name(&self, name: &str, id: u64) -> RegionName {
2047        RegionName::new(name, id)
2048    }
2049
2050    /// Open the circuit region identified by `name`.
2051    ///
2052    /// Emits an `OpenRegion` event.  Every operator or subcircuit created after
2053    /// this call (in the same thread) will be inserted in the named region until
2054    /// [`Circuit::close_region`] is called with the same `name`.  Saves the previously
2055    /// active region.
2056    ///
2057    /// # Panics
2058    ///
2059    /// The caller is responsible for pairing every `open_region` with a
2060    /// matching `close_region`; failing to do so will leave the region stack
2061    /// in an inconsistent state.
2062    #[track_caller]
2063    fn open_region(&self, name: RegionName);
2064
2065    /// Close the circuit region identified by `name`.  Restores the previously
2066    /// active region saved by [`Circuit::open_region`].
2067    ///
2068    /// Emits a `CloseRegion` event which should use the same name as the most recent
2069    /// [`Circuit::open_region`] call.
2070    fn close_region(&self, name: RegionName);
2071
2072    /// Add a dependency from `preprocessor_node_id` to all input operators in the
2073    /// circuit, making sure that this node and all its predecessors
2074    /// are evaluated before the rest of the circuit.
2075    fn add_preprocessor(&self, preprocessor_node_id: NodeId);
2076
2077    /// Add a source operator to the circuit.  See [`SourceOperator`].
2078    fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
2079    where
2080        O: Data,
2081        Op: SourceOperator<O>;
2082
2083    /// Add a pair of operators that implement cross-worker communication.
2084    ///
2085    /// Operators that exchange data across workers are split into two
2086    /// operators: the **sender** responsible for partitioning values read
2087    /// from the input stream and distributing them across workers and the
2088    /// **receiver**, which receives and reassembles data received from its
2089    /// peers.  Splitting communication into two halves allows the scheduler
2090    /// to schedule useful work in between them instead of blocking to wait
2091    /// for the receiver.
2092    ///
2093    /// Exchange operators use some form of IPC or shared memory instead of
2094    /// streams to communicate.  Therefore, the sender must implement trait
2095    /// [`SinkOperator`], while the receiver implements [`SourceOperator`].
2096    ///
2097    /// This function adds both operators to the circuit and registers a
2098    /// dependency between them, making sure that the scheduler will
2099    /// evaluate the sender before the receiver even though there is no
2100    /// explicit stream connecting them.
2101    ///
2102    /// Returns the output stream produced by the receiver operator.
2103    ///
2104    /// # Arguments
2105    ///
2106    /// * `sender` - the sender half of the pair.  The sender must be a sink
2107    ///   operator
2108    /// * `receiver` - the receiver half of the pair.  Must be a source
2109    /// * `input_stream` - stream to connect as input to the `sender`.
2110    fn add_exchange<I, SndOp, O, RcvOp>(
2111        &self,
2112        sender: SndOp,
2113        receiver: RcvOp,
2114        input_stream: &Stream<Self, I>,
2115    ) -> Stream<Self, O>
2116    where
2117        I: Data,
2118        O: Data,
2119        SndOp: SinkOperator<I>,
2120        RcvOp: SourceOperator<O>;
2121
2122    /// Like [`Self::add_exchange`], but overrides the ownership
2123    /// preference on the input stream with `input_preference`.
2124    fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
2125        &self,
2126        sender: SndOp,
2127        receiver: RcvOp,
2128        input_stream: &Stream<Self, I>,
2129        input_preference: OwnershipPreference,
2130    ) -> Stream<Self, O>
2131    where
2132        I: Data,
2133        O: Data,
2134        SndOp: SinkOperator<I>,
2135        RcvOp: SourceOperator<O>;
2136
2137    /// Add a sink operator (see [`SinkOperator`]).
2138    fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
2139    where
2140        I: Data,
2141        Op: SinkOperator<I>;
2142
2143    /// Like [`Self::add_sink`], but overrides the ownership preference on the
2144    /// input stream with `input_preference`.
2145    fn add_sink_with_preference<I, Op>(
2146        &self,
2147        operator: Op,
2148        input_stream: &Stream<Self, I>,
2149        input_preference: OwnershipPreference,
2150    ) -> GlobalNodeId
2151    where
2152        I: Data,
2153        Op: SinkOperator<I>;
2154
2155    /// Add a binary sink operator (see [`BinarySinkOperator`]).
2156    fn add_binary_sink<I1, I2, Op>(
2157        &self,
2158        operator: Op,
2159        input_stream1: &Stream<Self, I1>,
2160        input_stream2: &Stream<Self, I2>,
2161    ) where
2162        I1: Data,
2163        I2: Data,
2164        Op: BinarySinkOperator<I1, I2>;
2165
2166    /// Like [`Self::add_binary_sink`], but overrides the ownership preferences
2167    /// on both input streams with `input_preference1` and
2168    /// `input_preference2`.
2169    fn add_binary_sink_with_preference<I1, I2, Op>(
2170        &self,
2171        operator: Op,
2172        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2173        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2174    ) where
2175        I1: Data,
2176        I2: Data,
2177        Op: BinarySinkOperator<I1, I2>;
2178
2179    /// Add a ternary sink operator (see [`TernarySinkOperator`]).
2180    fn add_ternary_sink<I1, I2, I3, Op>(
2181        &self,
2182        operator: Op,
2183        input_stream1: &Stream<Self, I1>,
2184        input_stream2: &Stream<Self, I2>,
2185        input_stream3: &Stream<Self, I3>,
2186    ) -> GlobalNodeId
2187    where
2188        I1: Data,
2189        I2: Data,
2190        I3: Data,
2191        Op: TernarySinkOperator<I1, I2, I3>;
2192
2193    /// Like [`Self::add_ternary_sink`], but overrides the ownership preferences
2194    /// on input streams.
2195    fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
2196        &self,
2197        operator: Op,
2198        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2199        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2200        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2201    ) -> GlobalNodeId
2202    where
2203        I1: Data,
2204        I2: Data,
2205        I3: Data,
2206        Op: TernarySinkOperator<I1, I2, I3>;
2207
2208    /// Add a unary operator (see [`UnaryOperator`]).
2209    fn add_unary_operator<I, O, Op>(
2210        &self,
2211        operator: Op,
2212        input_stream: &Stream<Self, I>,
2213    ) -> Stream<Self, O>
2214    where
2215        I: Data,
2216        O: Data,
2217        Op: UnaryOperator<I, O>;
2218
2219    /// Like [`Self::add_unary_operator`], but overrides the ownership
2220    /// preference on the input stream with `input_preference`.
2221    fn add_unary_operator_with_preference<I, O, Op>(
2222        &self,
2223        operator: Op,
2224        input_stream: &Stream<Self, I>,
2225        input_preference: OwnershipPreference,
2226    ) -> Stream<Self, O>
2227    where
2228        I: Data,
2229        O: Data,
2230        Op: UnaryOperator<I, O>;
2231
2232    /// Add a binary operator (see [`BinaryOperator`]).
2233    fn add_binary_operator<I1, I2, O, Op>(
2234        &self,
2235        operator: Op,
2236        input_stream1: &Stream<Self, I1>,
2237        input_stream2: &Stream<Self, I2>,
2238    ) -> Stream<Self, O>
2239    where
2240        I1: Data,
2241        I2: Data,
2242        O: Data,
2243        Op: BinaryOperator<I1, I2, O>;
2244
2245    /// Like [`Self::add_binary_operator`], but overrides the ownership
2246    /// preference on both input streams with `input_preference1` and
2247    /// `input_preference2` respectively.
2248    fn add_binary_operator_with_preference<I1, I2, O, Op>(
2249        &self,
2250        operator: Op,
2251        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2252        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2253    ) -> Stream<Self, O>
2254    where
2255        I1: Data,
2256        I2: Data,
2257        O: Data,
2258        Op: BinaryOperator<I1, I2, O>;
2259
2260    /// Add a ternary operator (see [`TernaryOperator`]).
2261    fn add_ternary_operator<I1, I2, I3, O, Op>(
2262        &self,
2263        operator: Op,
2264        input_stream1: &Stream<Self, I1>,
2265        input_stream2: &Stream<Self, I2>,
2266        input_stream3: &Stream<Self, I3>,
2267    ) -> Stream<Self, O>
2268    where
2269        I1: Data,
2270        I2: Data,
2271        I3: Data,
2272        O: Data,
2273        Op: TernaryOperator<I1, I2, I3, O>;
2274
2275    /// Like [`Self::add_ternary_operator`], but overrides the ownership
2276    /// preference on the input streams with specified values.
2277    #[allow(clippy::too_many_arguments)]
2278    fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
2279        &self,
2280        operator: Op,
2281        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2282        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2283        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2284    ) -> Stream<Self, O>
2285    where
2286        I1: Data,
2287        I2: Data,
2288        I3: Data,
2289        O: Data,
2290        Op: TernaryOperator<I1, I2, I3, O>;
2291
2292    /// Add a quaternary operator (see [`QuaternaryOperator`]).
2293    fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
2294        &self,
2295        operator: Op,
2296        input_stream1: &Stream<Self, I1>,
2297        input_stream2: &Stream<Self, I2>,
2298        input_stream3: &Stream<Self, I3>,
2299        input_stream4: &Stream<Self, I4>,
2300    ) -> Stream<Self, O>
2301    where
2302        I1: Data,
2303        I2: Data,
2304        I3: Data,
2305        I4: Data,
2306        O: Data,
2307        Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2308
2309    /// Like [`Self::add_quaternary_operator`], but overrides the ownership
2310    /// preference on the input streams with specified values.
2311    #[allow(clippy::too_many_arguments)]
2312    fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
2313        &self,
2314        operator: Op,
2315        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2316        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2317        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2318        input_stream4: (&Stream<Self, I4>, OwnershipPreference),
2319    ) -> Stream<Self, O>
2320    where
2321        I1: Data,
2322        I2: Data,
2323        I3: Data,
2324        I4: Data,
2325        O: Data,
2326        Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2327
2328    /// Add a N-ary operator (see [`NaryOperator`]).
2329    fn add_nary_operator<'a, I, O, Op, Iter>(
2330        &'a self,
2331        operator: Op,
2332        input_streams: Iter,
2333    ) -> Stream<Self, O>
2334    where
2335        I: Data,
2336        O: Data,
2337        Op: NaryOperator<I, O>,
2338        Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2339
2340    /// Like [`Self::add_nary_operator`], but overrides the ownership
2341    /// preference with `input_preference`.
2342    fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
2343        &'a self,
2344        operator: Op,
2345        input_streams: Iter,
2346        input_preference: OwnershipPreference,
2347    ) -> Stream<Self, O>
2348    where
2349        I: Data,
2350        O: Data,
2351        Op: NaryOperator<I, O>,
2352        Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2353
2354    /// Use a constructor function to create a node.
2355    ///
2356    /// Used to add operators that do not implement any of the operator traits.
2357    /// The `constructor` closure is responsible for creating the node, its output stream,
2358    /// and connecting any input streams to the node.
2359    fn add_custom_node<N: Node, R>(
2360        &self,
2361        name: Cow<'static, str>,
2362        constructor: impl FnOnce(NodeId) -> (N, R),
2363    ) -> R;
2364
2365    /// Add a feedback loop to the circuit.
2366    ///
2367    /// Other methods in this API only support the construction of acyclic
2368    /// graphs, as they require the input stream to exist before nodes that
2369    /// consumes it are created.  This method instantiates an operator whose
2370    /// input stream can be connected later, and thus may depend on
2371    /// the operator's output.  This enables the construction of feedback loops.
2372    /// Since all loops in a well-formed circuit must include a [strict
2373    /// operator](`crate::circuit::operator_traits::StrictOperator`), `operator`
2374    /// must be [strict](`crate::circuit::operator_traits::StrictOperator`).
2375    ///
2376    /// Returns the output stream of the operator and an object that can be used
2377    /// to later connect its input.
2378    ///
2379    /// # Examples
2380    /// We build the following circuit to compute the sum of input values
2381    /// received from `source`. `z1` stores the sum accumulated during
2382    /// previous timestamps.  At every timestamp, the (`+`) operator
2383    /// computes the sum of the new value received from source and the value
2384    /// stored in `z1`.
2385    ///
2386    /// ```text
2387    ///                ┌─┐
2388    /// source ───────►│+├───┬─►
2389    ///           ┌───►└─┘   │
2390    ///           │          │ z1_feedback
2391    /// z1_output │    ┌──┐  │
2392    ///           └────┤z1│◄─┘
2393    ///                └──┘
2394    /// ```
2395    ///
2396    /// ```
2397    /// # use dbsp::{
2398    /// #     operator::{Z1, Generator},
2399    /// #     Circuit, RootCircuit,
2400    /// # };
2401    /// # let circuit = RootCircuit::build(|circuit| {
2402    /// // Create a data source.
2403    /// let source = circuit.add_source(Generator::new(|| 10));
2404    /// // Create z1.  `z1_output` will contain the output stream of `z1`; `z1_feedback`
2405    /// // is a placeholder where we can later plug the input to `z1`.
2406    /// let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
2407    /// // Connect outputs of `source` and `z1` to the plus operator.
2408    /// let plus = source.apply2(&z1_output, |n1: &usize, n2: &usize| n1 + n2);
2409    /// // Connect the output of `+` as input to `z1`.
2410    /// z1_feedback.connect(&plus);
2411    /// Ok(())
2412    /// # });
2413    /// ```
2414    fn add_feedback<I, O, Op>(
2415        &self,
2416        operator: Op,
2417    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2418    where
2419        I: Data,
2420        O: Data,
2421        Op: StrictUnaryOperator<I, O>;
2422
2423    /// Like `add_feedback`, but also assigns persistent id to the output half of the the strict operator.
2424    fn add_feedback_persistent<I, O, Op>(
2425        &self,
2426        persistent_id: Option<&str>,
2427        operator: Op,
2428    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2429    where
2430        I: Data,
2431        O: Data,
2432        Op: StrictUnaryOperator<I, O>,
2433    {
2434        let (output, feedback) = self.add_feedback(operator);
2435
2436        output.set_persistent_id(persistent_id);
2437
2438        (output, feedback)
2439    }
2440
2441    /// Like `add_feedback`, but additionally makes the output of the operator
2442    /// available to the parent circuit.
2443    ///
2444    /// Normally a [strict
2445    /// operator](`crate::circuit::operator_traits::StrictOperator`) writes a
2446    /// value computed based on inputs from previous clock cycles to its
2447    /// output stream at the start of each new clock cycle.  When the local
2448    /// clock epoch ends, the last value computed by the operator (that
2449    /// would otherwise be dropped) is written to the export stream instead.
2450    ///
2451    /// # Examples
2452    ///
2453    /// See example in the [`Self::iterate`] method.
2454    fn add_feedback_with_export<I, O, Op>(
2455        &self,
2456        operator: Op,
2457    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2458    where
2459        I: Data,
2460        O: Data,
2461        Op: StrictUnaryOperator<I, O>;
2462
2463    /// Like `add_feedback_with_export`, but also assigns persistent id to the output half of the strict operator.
2464    fn add_feedback_with_export_persistent<I, O, Op>(
2465        &self,
2466        persistent_id: Option<&str>,
2467        operator: Op,
2468    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2469    where
2470        I: Data,
2471        O: Data,
2472        Op: StrictUnaryOperator<I, O>,
2473    {
2474        let (export, feedback) = self.add_feedback_with_export(operator);
2475
2476        export.local.set_persistent_id(persistent_id);
2477
2478        (export, feedback)
2479    }
2480
2481    fn connect_feedback_with_preference<I, O, Op>(
2482        &self,
2483        output_node_id: NodeId,
2484        operator: Rc<RefCell<Op>>,
2485        input_stream: &Stream<Self, I>,
2486        input_preference: OwnershipPreference,
2487    ) where
2488        I: Data,
2489        O: Data,
2490        Op: StrictUnaryOperator<I, O>;
2491
2492    /// Add an iterative child circuit.
2493    ///
2494    /// Creates a child circuit with a nested logical clock.
2495    ///
2496    /// Creates an empty circuit with `self` as parent and invokes
2497    /// `child_constructor` to populate the circuit.  `child_constructor`
2498    /// typically captures some of the streams in `self` and connects them
2499    /// to source nodes of the child circuit.  It is also responsible for
2500    /// attaching an executor to the child circuit.  The return type `T`
2501    /// will typically contain output streams of the child.
2502    ///
2503    /// Most users should invoke higher-level APIs like [`Circuit::iterate`]
2504    /// instead of using this method directly.
2505    fn iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2506    where
2507        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2508        E: Executor<IterativeCircuit<Self>>;
2509
2510    /// Like `iterative_subcircuit`, but creates a child circuit that runs on the same
2511    /// clock as the parent.
2512    fn non_iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2513    where
2514        F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2515        E: Executor<NonIterativeCircuit<Self>>;
2516
2517    /// Add an iteratively scheduled child circuit.
2518    ///
2519    /// Add a child circuit with a nested clock.  The child will execute
2520    /// multiple times for each parent timestamp, until its termination
2521    /// condition is satisfied.  Every time the child circuit is activated
2522    /// by the parent (once per parent timestamp), the executor calls
2523    /// [`clock_start`](`super::operator_traits::Operator::clock_start`)
2524    /// on each child operator.  It then calls `eval` on all
2525    /// child operators in a causal order and checks if the termination
2526    /// condition is satisfied.  If the condition is `false`, the
2527    /// executor `eval`s all operators again.  Once the termination
2528    /// condition is `true`, the executor calls `clock_end` on all child
2529    /// operators and returns control back to the parent scheduler.
2530    ///
2531    /// The `constructor` closure populates the child circuit and returns a
2532    /// closure that checks the termination condition and an arbitrary
2533    /// user-defined return value that typically contains output streams
2534    /// of the child.
2535    ///
2536    /// # Examples
2537    ///
2538    /// ```
2539    /// # use std::{cell::RefCell, rc::Rc};
2540    /// use dbsp::{
2541    ///     operator::{Generator, Z1},
2542    ///     Circuit, RootCircuit,
2543    ///     Error as DbspError,
2544    /// };
2545    ///
2546    /// let (circuit_handle, output_handle) = RootCircuit::build(|root_circuit| {
2547    ///     // Generate sequence 0, 1, 2, ...
2548    ///     let mut n: usize = 0;
2549    ///     let source = root_circuit.add_source(Generator::new(move || {
2550    ///         let result = n;
2551    ///         n += 1;
2552    ///         result
2553    ///     }));
2554    ///     // Compute factorial of each number in the sequence.
2555    ///     let fact = root_circuit
2556    ///         .iterate(|child_circuit| {
2557    ///             let counter = Rc::new(RefCell::new(1));
2558    ///             let counter_clone = counter.clone();
2559    ///             let countdown = source.delta0(child_circuit).apply(move |parent_val| {
2560    ///                 let mut counter_borrow = counter_clone.borrow_mut();
2561    ///                 *counter_borrow += *parent_val;
2562    ///                 let res = *counter_borrow;
2563    ///                 *counter_borrow -= 1;
2564    ///                 res
2565    ///             });
2566    ///             let (z1_output, z1_feedback) = child_circuit.add_feedback_with_export(Z1::new(1));
2567    ///             let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
2568    ///             z1_feedback.connect(&mul);
2569    ///             // Stop iterating when the countdown reaches 0.
2570    ///             Ok((async move || Ok(*counter.borrow() == 0), z1_output.export))
2571    ///         })?;
2572    ///     Ok(fact.output())
2573    /// })?;
2574    ///
2575    /// let factorial = |n: usize| (1..=n).product::<usize>();
2576    /// const ITERATIONS: usize = 10;
2577    /// for i in 0..ITERATIONS {
2578    ///     circuit_handle.transaction()?;
2579    ///     let result = output_handle.take_from_all();
2580    ///     let result = result.first().unwrap();
2581    ///     println!("Iteration {:3}: {:3}! = {}", i + 1, i, result);
2582    ///     assert_eq!(*result, factorial(i));
2583    /// }
2584    ///
2585    /// Ok::<(), DbspError>(())
2586    /// ```
2587    fn iterate<F, C, T>(&self, constructor: F) -> Result<T, SchedulerError>
2588    where
2589        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2590        C: AsyncFn() -> Result<bool, SchedulerError> + 'static;
2591
2592    /// Add an iteratively scheduled child circuit.
2593    ///
2594    /// Similar to [`iterate`](`Self::iterate`), but with a user-specified
2595    /// [`Scheduler`] implementation.
2596    fn iterate_with_scheduler<F, C, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2597    where
2598        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2599        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
2600        S: Scheduler + 'static;
2601
2602    /// Add a child circuit that will iterate to a fixed point.
2603    ///
2604    /// For each parent clock cycle, the child circuit will iterate until
2605    /// reaching a fixed point, i.e., a state where the outputs of all
2606    /// operators are guaranteed to remain the same, should the nested clock
2607    /// continue ticking.
2608    ///
2609    /// The fixed point check is implemented by checking the following
2610    /// condition:
2611    ///
2612    /// * All operators in the circuit are in such a state that, if their inputs
2613    ///   remain constant (i.e., all future inputs are identical to the last
2614    ///   input), then their outputs remain constant too.
2615    ///
2616    /// This is a necessary and sufficient condition that is also easy to check
2617    /// by asking each operator if it is in a stable state (via the
2618    /// [`Operator::fixedpoint`](`super::operator_traits::Operator::fixedpoint`)
2619    /// API.
2620    ///
2621    /// # Warning
2622    ///
2623    /// The cost of checking this condition precisely can be high for some
2624    /// operators, which implement approximate checks instead.  For instance,
2625    /// delay operators ([`Z1`](`crate::operator::Z1`) and
2626    /// [`Z1Nested`](`crate::operator::Z1Nested`)) require storing the last
2627    /// two versions of the state instead of one and comparing them at each
2628    /// cycle.  Instead, they conservatively check for _specific_ fixed points,
2629    /// namely fixed points where both input and output of the operator are zero
2630    /// As a result, the circuit may fail to detect other fixed points and may
2631    /// iterate forever.
2632    ///
2633    /// The goal is to evolve the design so that circuits created using the
2634    /// high-level API (`Stream::xxx` methods) implement accurate fixed point
2635    /// checks, but there are currently no guardrails in the system against
2636    /// constructing non-compliant circuits.
2637    fn fixedpoint<F, T>(&self, constructor: F) -> Result<T, SchedulerError>
2638    where
2639        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>;
2640
2641    /// Add a child circuit that will iterate to a fixed point.
2642    ///
2643    /// Similar to [`fixedpoint`](`Self::fixedpoint`), but with a user-specified
2644    /// [`Scheduler`] implementation.
2645    fn fixedpoint_with_scheduler<F, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2646    where
2647        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>,
2648        S: Scheduler + 'static;
2649
2650    /// Make the contents of `parent_stream` available in the nested circuit
2651    /// via an [`ImportOperator`].
2652    ///
2653    /// Typically invoked via a convenience wrapper, e.g., [`Stream::delta0`].
2654    fn import_stream<I, O, Op>(
2655        &self,
2656        operator: Op,
2657        parent_stream: &Stream<Self::Parent, I>,
2658    ) -> Stream<Self, O>
2659    where
2660        Self::Parent: Circuit,
2661        I: Data,
2662        O: Data,
2663        Op: ImportOperator<I, O>;
2664
2665    /// Like [`Self::import_stream`] but overrides the ownership
2666    /// preference on the input stream with `input_preference.
2667    fn import_stream_with_preference<I, O, Op>(
2668        &self,
2669        operator: Op,
2670        parent_stream: &Stream<Self::Parent, I>,
2671        input_preference: OwnershipPreference,
2672    ) -> Stream<Self, O>
2673    where
2674        Self::Parent: Circuit,
2675        I: Data,
2676        O: Data,
2677        Op: ImportOperator<I, O>;
2678}
2679
2680/// A collection of edges in a circuit, indexed by source, destination, and stream id.
2681pub struct Edges {
2682    by_source: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2683    by_destination: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2684    by_stream: BTreeMap<Option<StreamId>, Vec<Rc<Edge>>>,
2685}
2686
2687impl Edges {
2688    fn new() -> Self {
2689        Self {
2690            by_source: BTreeMap::new(),
2691            by_destination: BTreeMap::new(),
2692            by_stream: BTreeMap::new(),
2693        }
2694    }
2695
2696    fn add_edge(&mut self, edge: Edge) {
2697        let edge = Rc::new(edge);
2698
2699        self.by_source
2700            .entry(edge.from)
2701            .or_default()
2702            .push(edge.clone());
2703        self.by_destination
2704            .entry(edge.to)
2705            .or_default()
2706            .push(edge.clone());
2707
2708        self.by_stream
2709            .entry(edge.stream.as_ref().map(|s| s.stream_id()))
2710            .or_default()
2711            .push(edge);
2712    }
2713
2714    fn extend<I>(&mut self, edges: I)
2715    where
2716        I: IntoIterator<Item = Edge>,
2717    {
2718        for edge in edges {
2719            self.add_edge(edge)
2720        }
2721    }
2722
2723    pub(crate) fn iter(&self) -> impl Iterator<Item = &Edge> {
2724        self.by_source
2725            .values()
2726            .flat_map(|edges| edges.iter().map(|edge| edge.as_ref()))
2727    }
2728
2729    pub(crate) fn get_by_stream_id(&self, stream_id: &Option<StreamId>) -> Option<&[Rc<Edge>]> {
2730        self.by_stream.get(stream_id).map(|v| v.as_slice())
2731    }
2732
2733    fn delete_stream(&mut self, stream_id: StreamId) {
2734        if let Some(edges) = self.by_stream.remove(&Some(stream_id)) {
2735            for edge in edges {
2736                if let Some(v) = self.by_source.get_mut(&edge.from) {
2737                    v.retain(|e| e.stream_id() != Some(stream_id))
2738                }
2739                if let Some(v) = self.by_destination.get_mut(&edge.to) {
2740                    v.retain(|e| e.stream_id() != Some(stream_id))
2741                }
2742            }
2743        }
2744    }
2745
2746    pub(crate) fn inputs_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2747        self.by_destination
2748            .get(&node_id)
2749            .into_iter()
2750            .flatten()
2751            .map(|edge| edge.as_ref())
2752    }
2753
2754    /// Nodes that depend on node_id directly.
2755    ///
2756    /// Nodes that have an incoming _dependency_ edge from `node_id`.
2757    pub(crate) fn depend_on(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2758        self.by_source.get(&node_id).into_iter().flat_map(|edges| {
2759            edges.iter().filter_map(|edge| {
2760                if edge.is_dependency() {
2761                    Some(edge.as_ref())
2762                } else {
2763                    None
2764                }
2765            })
2766        })
2767    }
2768
2769    /// Nodes that `node_id` depends on directly.
2770    ///
2771    /// Nodes that have an outgoing _dependency_ edge to `node_id`.
2772    pub(crate) fn dependencies_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2773        self.by_destination
2774            .get(&node_id)
2775            .into_iter()
2776            .flat_map(|edges| {
2777                edges.iter().filter_map(|edge| {
2778                    if edge.is_dependency() {
2779                        Some(edge.as_ref())
2780                    } else {
2781                        None
2782                    }
2783                })
2784            })
2785    }
2786
2787    fn clear(&mut self) {
2788        *self = Self::new();
2789    }
2790}
2791
2792/// A circuit consists of nodes and edges.  An edge from
2793/// node1 to node2 indicates that the output stream of node1
2794/// is connected to an input of node2.
2795struct CircuitInner<P>
2796where
2797    P: 'static,
2798{
2799    parent: P,
2800
2801    /// Root of the circuit tree.  `None` if this is the root circuit.
2802    root: Option<RootCircuit>,
2803
2804    root_scope: Scope,
2805
2806    /// Circuit's node id within the parent circuit.
2807    node_id: NodeId,
2808    global_node_id: GlobalNodeId,
2809    nodes: RefCell<Vec<RefCell<Box<dyn Node>>>>,
2810    edges: RefCell<Edges>,
2811    import_nodes: RefCell<Vec<NodeId>>,
2812    circuit_event_handlers: CircuitEventHandlers,
2813    scheduler_event_handlers: SchedulerEventHandlers,
2814    store: RefCell<CircuitCache>,
2815    last_stream_id: RefCell<StreamId>,
2816    metadata_exchange: MetadataExchange,
2817    balancer: Rc<Balancer>,
2818}
2819
2820impl<P> CircuitInner<P>
2821where
2822    P: 'static,
2823{
2824    #[allow(clippy::too_many_arguments)]
2825    fn new(
2826        parent: P,
2827        root: Option<RootCircuit>,
2828        root_scope: Scope,
2829        node_id: NodeId,
2830        global_node_id: GlobalNodeId,
2831        circuit_event_handlers: CircuitEventHandlers,
2832        scheduler_event_handlers: SchedulerEventHandlers,
2833        last_stream_id: RefCell<StreamId>,
2834    ) -> Self {
2835        let metadata_exchange = MetadataExchange::new();
2836
2837        Self {
2838            parent,
2839            root,
2840            root_scope,
2841            node_id,
2842            global_node_id,
2843            nodes: RefCell::new(Vec::new()),
2844            edges: RefCell::new(Edges::new()),
2845            import_nodes: RefCell::new(Vec::new()),
2846            circuit_event_handlers,
2847            scheduler_event_handlers,
2848            store: RefCell::new(TypedMap::new()),
2849            last_stream_id,
2850            metadata_exchange: metadata_exchange.clone(),
2851            balancer: Rc::new(Balancer::new(&metadata_exchange)),
2852        }
2853    }
2854
2855    fn add_edge(&self, edge: Edge) {
2856        self.edges.borrow_mut().add_edge(edge);
2857    }
2858
2859    fn add_node<N>(&self, mut node: N)
2860    where
2861        N: Node + 'static,
2862    {
2863        node.init();
2864        self.nodes
2865            .borrow_mut()
2866            .push(RefCell::new(Box::new(node) as Box<dyn Node>));
2867    }
2868
2869    fn add_import_node(&self, node_id: NodeId) {
2870        self.import_nodes.borrow_mut().push(node_id);
2871    }
2872
2873    fn import_nodes(&self) -> Vec<NodeId> {
2874        self.import_nodes.borrow().clone()
2875    }
2876
2877    fn clear(&self) {
2878        self.nodes.borrow_mut().clear();
2879        self.edges.borrow_mut().clear();
2880        self.store.borrow_mut().clear();
2881    }
2882
2883    fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
2884    where
2885        F: Fn(&CircuitEvent) + 'static,
2886    {
2887        self.circuit_event_handlers.borrow_mut().insert(
2888            name.to_string(),
2889            Box::new(handler) as Box<dyn Fn(&CircuitEvent)>,
2890        );
2891    }
2892
2893    fn unregister_circuit_event_handler(&self, name: &str) -> bool {
2894        self.circuit_event_handlers
2895            .borrow_mut()
2896            .remove(name)
2897            .is_some()
2898    }
2899
2900    fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
2901    where
2902        F: FnMut(&SchedulerEvent<'_>) + 'static,
2903    {
2904        self.scheduler_event_handlers.borrow_mut().insert(
2905            name.to_string(),
2906            Box::new(handler) as Box<dyn FnMut(&SchedulerEvent<'_>)>,
2907        );
2908    }
2909
2910    fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
2911        self.scheduler_event_handlers
2912            .borrow_mut()
2913            .remove(name)
2914            .is_some()
2915    }
2916
2917    fn log_circuit_event(&self, event: &CircuitEvent) {
2918        for (_, handler) in self.circuit_event_handlers.borrow().iter() {
2919            handler(event)
2920        }
2921    }
2922
2923    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
2924        for (_, handler) in self.scheduler_event_handlers.borrow_mut().iter_mut() {
2925            handler(event)
2926        }
2927    }
2928
2929    fn check_fixedpoint(&self, scope: Scope) -> bool {
2930        self.nodes.borrow().iter().all(|node| {
2931            /*
2932            if !res {
2933                let n = node.borrow();
2934                let n1 = n.deref();
2935                eprintln!("not fixed {:?} {}", n1.global_id(), n1.name());
2936            };
2937            */
2938            node.borrow().fixedpoint(scope)
2939        })
2940    }
2941
2942    fn lookup_local_node_by_persistent_id(
2943        &self,
2944        persistent_id: &str,
2945    ) -> Result<GlobalNodeId, DbspError> {
2946        self.nodes
2947            .borrow()
2948            .iter()
2949            .find_map(|node| {
2950                if node.borrow().get_label(LABEL_PERSISTENT_OPERATOR_ID) == Some(persistent_id) {
2951                    Some(node.borrow().global_id().clone())
2952                } else {
2953                    None
2954                }
2955            })
2956            .ok_or_else(|| {
2957                DbspError::Runtime(RuntimeError::UnknownPersistentId(persistent_id.to_string()))
2958            })
2959    }
2960}
2961
2962/// A circuit.
2963///
2964/// A single implementation that can operate as the top-level
2965/// circuit when instantiated with `P = ()` or a nested circuit,
2966/// with `P = ChildCircuit<..>` designating the parent circuit type.
2967pub struct ChildCircuit<P, T>
2968where
2969    P: 'static,
2970    T: Timestamp,
2971{
2972    inner: Rc<CircuitInner<P>>,
2973    time: Rc<RefCell<T>>,
2974}
2975
2976/// Top-level circuit.
2977///
2978/// `RootCircuit` is a specialization of [`ChildCircuit<P>`] with `P = ()`.  It
2979/// forms the top level of a possibly nested DBSP circuit.  Every use of DBSP
2980/// needs a top-level circuit and non-recursive queries, including all of
2981/// standard SQL, only needs a top-level circuit.
2982///
2983/// Input enters a circuit through the top level circuit only.  `RootCircuit`
2984/// has `add_input_*` methods for setting up input operators, which can only be
2985/// called within the callback passed to `RootCircuit::build`.  The data from
2986/// the input operators is represented as a [`Stream`], which may be in turn be
2987/// used as input for further operators, which are primarily instantiated via
2988/// methods on [`Stream`].  Stream output may be made available outside the
2989/// bounds of a circuit using [`Stream::output`].
2990pub type RootCircuit = ChildCircuit<(), ()>;
2991
2992pub type NestedCircuit = ChildCircuit<RootCircuit, <() as Timestamp>::Nested>;
2993
2994/// A child circuit with a nested clock.
2995pub type IterativeCircuit<P> = ChildCircuit<P, <<P as WithClock>::Time as Timestamp>::Nested>;
2996
2997/// A child circuit that runs on the same clock as the parent.
2998pub type NonIterativeCircuit<P> = ChildCircuit<P, <P as WithClock>::Time>;
2999
3000impl<P, T> Clone for ChildCircuit<P, T>
3001where
3002    P: 'static,
3003    T: Timestamp,
3004{
3005    fn clone(&self) -> Self {
3006        Self {
3007            inner: self.inner.clone(),
3008            time: self.time.clone(),
3009        }
3010    }
3011}
3012
3013impl<P, T> ChildCircuit<P, T>
3014where
3015    P: 'static,
3016    T: Timestamp,
3017{
3018    /// Immutably borrow the inner circuit.
3019    fn inner(&self) -> &CircuitInner<P> {
3020        &self.inner
3021    }
3022}
3023
3024impl RootCircuit {
3025    /// Creates a circuit and prepares it for execution by calling
3026    /// `constructor`.  The constructor should create input operators by calling
3027    /// [`RootCircuit::dyn_add_input_zset`] and related methods.  Each of these
3028    /// calls returns an input handle and a [`Stream`].  The `constructor` can
3029    /// call [`Stream`] methods to do computation, each of which yields further
3030    /// [`Stream`]s.  It can also use [`Stream::output`] to obtain an output
3031    /// handle.
3032    ///
3033    /// Returns a [`CircuitHandle`] with which the caller can control the
3034    /// circuit, plus a user-defined value returned by the constructor.  The
3035    /// `constructor` should use the latter to return the input and output
3036    /// handles it obtains, because these allow the caller to feed input into
3037    /// the circuit and read output from the circuit.
3038    ///
3039    /// The default scheduler, currently [`DynamicScheduler`], will decide the
3040    /// order in which to evaluate operators.  (This scheduler does not schedule
3041    /// processes or threads.)
3042    ///
3043    /// A client may use the returned [`CircuitHandle`] to run the circuit in
3044    /// the context of the current thread.  To instead run the circuit in a
3045    /// collection of worker threads, call [`Runtime::init_circuit`]
3046    /// instead.
3047    ///
3048    /// # Example
3049    ///
3050    /// ```
3051    /// use dbsp::{
3052    ///     operator::{Generator, Inspect},
3053    ///     Circuit, RootCircuit,
3054    /// };
3055    ///
3056    /// let circuit = RootCircuit::build(|circuit| {
3057    ///     // Add a source operator.
3058    ///     let source_stream = circuit.add_source(Generator::new(|| "Hello, world!".to_owned()));
3059    ///
3060    ///     // Add a unary operator and wire the source directly to it.
3061    ///     circuit.add_unary_operator(
3062    ///         Inspect::new(|n| println!("New output: {}", n)),
3063    ///         &source_stream,
3064    ///     );
3065    ///     Ok(())
3066    /// });
3067    /// ```
3068    pub fn build<F, T>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
3069    where
3070        F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
3071    {
3072        Self::build_with_scheduler::<F, T, DynamicScheduler>(constructor)
3073    }
3074
3075    /// Create a circuit and prepare it for execution.
3076    ///
3077    /// Similar to [`build`](`Self::build`), but with a user-specified
3078    /// [`Scheduler`] implementation that decides the order in which to evaluate
3079    /// operators.  (This scheduler does not schedule processes or threads.)
3080    pub fn build_with_scheduler<F, T, S>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
3081    where
3082        F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
3083        S: Scheduler + 'static,
3084    {
3085        // TODO: user LocalRuntime instead of Runtime + LocalSet when
3086        // tokio::LocalRuntime is stable.
3087        // Local tokio runtime that schedules operators on the current worker thread.
3088        let tokio_runtime = tokio::runtime::Builder::new_current_thread()
3089            .build()
3090            .map_err(|e| {
3091                DbspError::Scheduler(SchedulerError::TokioError {
3092                    error: e.to_string(),
3093                })
3094            })?;
3095
3096        let mut circuit = RootCircuit::new();
3097        let res = constructor(&mut circuit).map_err(DbspError::Constructor)?;
3098        let mut executor = Box::new(<OnceExecutor<S>>::new()) as Box<dyn Executor<RootCircuit>>;
3099        executor.prepare(&circuit, None)?;
3100
3101        // if Runtime::worker_index() == 0 {
3102        //     circuit.to_dot_file(
3103        //         |node| {
3104        //             Some(crate::utils::DotNodeAttributes::new().with_label(&format!(
3105        //                 "{}-{}-{}",
3106        //                 node.local_id(),
3107        //                 node.name(),
3108        //                 node.persistent_id().unwrap_or_default()
3109        //             )))
3110        //         },
3111        //         |edge| {
3112        //             let style = if edge.is_dependency() {
3113        //                 Some("dotted".to_string())
3114        //             } else {
3115        //                 None
3116        //             };
3117        //             let label = if let Some(stream) = &edge.stream {
3118        //                 Some(format!("consumers: {}", stream.num_consumers()))
3119        //             } else {
3120        //                 None
3121        //             };
3122        //             Some(
3123        //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
3124        //                     .with_style(style)
3125        //                     .with_label(label),
3126        //             )
3127        //         },
3128        //         "circuit.dot",
3129        //     );
3130        //     println!("circuit written to circuit.dot");
3131        // }
3132
3133        // Alternatively, `CircuitHandle` should expose `clock_start` and `clock_end`
3134        // APIs, so that the user can reset the circuit at runtime and start
3135        // evaluation from clean state without having to rebuild it from
3136        // scratch.
3137        circuit.log_scheduler_event(&SchedulerEvent::clock_start());
3138        circuit.clock_start(0);
3139        Ok((
3140            CircuitHandle {
3141                circuit,
3142                executor,
3143                tokio_runtime,
3144                replay_info: None,
3145            },
3146            res,
3147        ))
3148    }
3149}
3150
3151impl RootCircuit {
3152    // Create new top-level circuit.  Clients invoke this via the
3153    // [`RootCircuit::build`] API.
3154    fn new() -> Self {
3155        Self {
3156            inner: Rc::new(CircuitInner::new(
3157                (),
3158                None,
3159                0,
3160                NodeId::root(),
3161                GlobalNodeId::root(),
3162                Rc::new(RefCell::new(HashMap::new())),
3163                Rc::new(RefCell::new(HashMap::new())),
3164                RefCell::new(StreamId::new(0)),
3165            )),
3166            time: Rc::new(RefCell::new(())),
3167        }
3168    }
3169}
3170
3171impl RootCircuit {
3172    /// Attach a circuit event handler to the top-level circuit (see
3173    /// [`super::trace::CircuitEvent`] for a description of circuit events).
3174    ///
3175    /// This method should normally be called inside the closure passed to
3176    /// [`RootCircuit::build`] before adding any operators to the circuit, so
3177    /// that the handler gets to observe all nodes, edges, and subcircuits
3178    /// added to the circuit.
3179    ///
3180    /// `name` - user-readable name assigned to the handler.  If a handler with
3181    /// the same name exists, it will be replaced by the new handler.
3182    ///
3183    /// `handler` - user callback invoked on each circuit event (see
3184    /// [`super::trace::CircuitEvent`]).
3185    ///
3186    /// # Examples
3187    ///
3188    /// ```text
3189    /// TODO
3190    /// ```
3191    pub fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
3192    where
3193        F: Fn(&CircuitEvent) + 'static,
3194    {
3195        self.inner().register_circuit_event_handler(name, handler);
3196    }
3197
3198    /// Remove a circuit event handler.  Returns `true` if a handler with the
3199    /// specified name had previously been registered and `false` otherwise.
3200    pub fn unregister_circuit_event_handler(&self, name: &str) -> bool {
3201        self.inner().unregister_circuit_event_handler(name)
3202    }
3203
3204    /// Attach a scheduler event handler to the top-level circuit (see
3205    /// [`super::trace::SchedulerEvent`] for a description of scheduler
3206    /// events).
3207    ///
3208    /// This method can be used during circuit construction, inside the closure
3209    /// provided to [`RootCircuit::build`].  Use
3210    /// [`CircuitHandle::register_scheduler_event_handler`],
3211    /// [`CircuitHandle::unregister_scheduler_event_handler`] to manipulate
3212    /// scheduler callbacks at runtime.
3213    ///
3214    /// `name` - user-readable name assigned to the handler.  If a handler with
3215    /// the same name exists, it will be replaced by the new handler.
3216    ///
3217    /// `handler` - user callback invoked on each scheduler event.
3218    pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
3219    where
3220        F: FnMut(&SchedulerEvent<'_>) + 'static,
3221    {
3222        self.inner().register_scheduler_event_handler(name, handler);
3223    }
3224
3225    /// Remove a scheduler event handler.  Returns `true` if a handler with the
3226    /// specified name had previously been registered and `false` otherwise.
3227    pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
3228        self.inner().unregister_scheduler_event_handler(name)
3229    }
3230}
3231
3232impl<P, T> ChildCircuit<P, T>
3233where
3234    P: Circuit,
3235    T: Timestamp,
3236{
3237    /// Create an empty nested circuit of `parent`.
3238    fn with_parent(parent: P, id: NodeId) -> Self {
3239        let global_node_id = parent.global_node_id().child(id);
3240        let circuit_handlers = parent.circuit_event_handlers();
3241        let sched_handlers = parent.scheduler_event_handlers();
3242        let root_scope = parent.root_scope() + 1;
3243        let last_stream_id = parent.last_stream_id();
3244
3245        let root = parent.root_circuit();
3246
3247        ChildCircuit {
3248            inner: Rc::new(CircuitInner::new(
3249                parent,
3250                Some(root),
3251                root_scope,
3252                id,
3253                global_node_id,
3254                circuit_handlers,
3255                sched_handlers,
3256                last_stream_id,
3257            )),
3258            time: Rc::new(RefCell::new(Timestamp::clock_start())),
3259        }
3260    }
3261
3262    /// `true` if `self` is a subcircuit of `other`.
3263    pub fn is_child_of(&self, other: &P) -> bool {
3264        P::ptr_eq(&self.inner().parent, other)
3265    }
3266}
3267
3268// Internal API.
3269impl<P, T> ChildCircuit<P, T>
3270where
3271    P: 'static,
3272    T: Timestamp,
3273    Self: Circuit,
3274{
3275    /// Circuit's node id within the parent circuit.
3276    fn node_id(&self) -> NodeId {
3277        self.inner().node_id
3278    }
3279
3280    /// Add a node to the circuit.
3281    ///
3282    /// Allocates a new node id and invokes a user callback to create a new node
3283    /// instance. The callback may use the node id, e.g., to add an edge to
3284    /// this node.
3285    fn add_node<F, N, V>(&self, f: F) -> V
3286    where
3287        F: FnOnce(NodeId) -> (N, V),
3288        N: Node + 'static,
3289    {
3290        let id = self.inner().nodes.borrow().len();
3291
3292        // We don't hold a reference to `self.inner()` while calling `f`, so it can
3293        // safely modify the circuit, e.g., add edges.
3294        let (node, res) = f(NodeId(id));
3295        self.inner().add_node(node);
3296        res
3297    }
3298
3299    fn add_import_node(&self, node_id: NodeId) {
3300        self.inner().add_import_node(node_id);
3301    }
3302
3303    /// Like `add_node`, but the node is not created if the closure fails.
3304    fn try_add_node<F, N, V, E>(&self, f: F) -> Result<V, E>
3305    where
3306        F: FnOnce(NodeId) -> Result<(N, V), E>,
3307        N: Node + 'static,
3308    {
3309        let id = self.inner().nodes.borrow().len();
3310
3311        // We don't hold a reference to `self.inner()` while calling `f`, so it can
3312        // safely modify the circuit, e.g., add edges.
3313        let (node, res) = f(NodeId(id))?;
3314        self.inner().add_node(node);
3315        Ok(res)
3316    }
3317
3318    /// Send the specified `CircuitEvent` to all handlers attached to the
3319    /// circuit.
3320    fn log_circuit_event(&self, event: &CircuitEvent) {
3321        self.inner().log_circuit_event(event);
3322    }
3323
3324    /// Send the specified `SchedulerEvent` to all handlers attached to the
3325    /// circuit.
3326    pub(super) fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3327        self.inner().log_scheduler_event(event);
3328    }
3329}
3330
3331impl<P, T> CircuitBase for ChildCircuit<P, T>
3332where
3333    P: Clone + 'static,
3334    T: Timestamp,
3335{
3336    fn edges(&self) -> Ref<'_, Edges> {
3337        self.inner().edges.borrow()
3338    }
3339
3340    fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>> {
3341        let edges = self.edges();
3342        let mut result = BTreeMap::new();
3343
3344        // For each node, compute its transitive ancestors using BFS
3345        for node_id in self.node_ids() {
3346            let mut ancestors = BTreeSet::new();
3347            let mut queue = vec![node_id];
3348
3349            // BFS to find all transitive ancestors
3350            while let Some(current) = queue.pop() {
3351                for edge in edges.inputs_of(current) {
3352                    let ancestor_node = edge.from;
3353                    if ancestors.insert(ancestor_node) {
3354                        queue.push(ancestor_node);
3355                    }
3356                }
3357            }
3358
3359            result.insert(node_id, ancestors);
3360        }
3361
3362        result
3363    }
3364
3365    fn edges_mut(&self) -> RefMut<'_, Edges> {
3366        self.inner().edges.borrow_mut()
3367    }
3368
3369    fn num_nodes(&self) -> usize {
3370        self.inner().nodes.borrow().len()
3371    }
3372
3373    fn clear(&mut self) {
3374        self.inner().clear();
3375    }
3376
3377    fn add_dependency(&self, from: NodeId, to: NodeId) {
3378        self.log_circuit_event(&CircuitEvent::dependency(
3379            self.global_node_id().child(from),
3380            self.global_node_id().child(to),
3381        ));
3382
3383        let origin = self.global_node_id().child(from);
3384        self.inner().add_edge(Edge {
3385            from,
3386            to,
3387            origin,
3388            stream: None,
3389            ownership_preference: None,
3390        });
3391    }
3392
3393    /// Apply `f` to the node with the specified `path` relative to `self`.
3394    fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
3395        let nodes = self.inner().nodes.borrow();
3396        let node = nodes[path[0].0].borrow();
3397        if path.len() == 1 {
3398            f(node.as_ref())
3399        } else {
3400            node.map_child(&path[1..], &mut |node| f(node));
3401        }
3402    }
3403
3404    /// Apply `f` to the node with the specified `path` relative to `self`.
3405    fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
3406        let nodes = self.inner().nodes.borrow();
3407        let mut node = nodes[path[0].0].borrow_mut();
3408        if path.len() == 1 {
3409            f(node.as_mut())
3410        } else {
3411            node.map_child_mut(&path[1..], &mut |node| f(node));
3412        }
3413    }
3414
3415    fn map_nodes_recursive(
3416        &self,
3417        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3418    ) -> Result<(), DbspError> {
3419        for node in self.inner().nodes.borrow().iter() {
3420            f(node.borrow().as_ref())?;
3421            node.borrow().map_nodes_recursive(f)?;
3422        }
3423        Ok(())
3424    }
3425
3426    fn map_nodes_recursive_mut(
3427        &mut self,
3428        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3429    ) -> Result<(), DbspError> {
3430        for node in self.inner().nodes.borrow_mut().iter_mut() {
3431            f(node.borrow_mut().as_mut())?;
3432            node.borrow_mut().map_nodes_recursive_mut(f)?;
3433        }
3434
3435        Ok(())
3436    }
3437
3438    fn map_local_nodes(
3439        &self,
3440        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3441    ) -> Result<(), DbspError> {
3442        for node in self.inner().nodes.borrow().iter() {
3443            f(node.borrow().as_ref())?;
3444        }
3445        Ok(())
3446    }
3447
3448    fn map_local_nodes_mut(
3449        &self,
3450        f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3451    ) -> Result<(), DbspError> {
3452        for node in self.inner().nodes.borrow_mut().iter_mut() {
3453            f(node.borrow_mut().as_mut())?;
3454        }
3455
3456        Ok(())
3457    }
3458
3459    fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node)) {
3460        self.map_node_mut_relative(&[id], &mut |node| f(node));
3461    }
3462
3463    fn map_subcircuits(
3464        &self,
3465        f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
3466    ) -> Result<(), DbspError> {
3467        for node in self.inner().nodes.borrow().iter() {
3468            let node = node.borrow();
3469            if let Some(child_circuit) = node.as_circuit() {
3470                f(child_circuit)?;
3471            }
3472        }
3473        Ok(())
3474    }
3475
3476    fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str) {
3477        self.map_node_mut(id, &mut |node| node.set_label(key, val));
3478    }
3479
3480    fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String> {
3481        self.map_node(id, &mut |node| node.get_label(key).map(str::to_string))
3482    }
3483
3484    fn global_id(&self) -> &GlobalNodeId {
3485        &self.inner().global_node_id
3486    }
3487
3488    /// Returns vector of local node ids in the circuit.
3489    fn node_ids(&self) -> Vec<NodeId> {
3490        self.inner()
3491            .nodes
3492            .borrow()
3493            .iter()
3494            .map(|node| node.borrow().local_id())
3495            .collect()
3496    }
3497
3498    fn lookup_local_node_by_persistent_id(
3499        &self,
3500        persistent_id: &str,
3501    ) -> Result<GlobalNodeId, DbspError> {
3502        self.inner()
3503            .lookup_local_node_by_persistent_id(persistent_id)
3504    }
3505
3506    fn import_nodes(&self) -> Vec<NodeId> {
3507        self.inner().import_nodes()
3508    }
3509
3510    fn allocate_stream_id(&self) -> StreamId {
3511        let circuit = self.inner();
3512        let mut last_stream_id = circuit.last_stream_id.borrow_mut();
3513        last_stream_id.0 += 1;
3514        *last_stream_id
3515    }
3516
3517    fn last_stream_id(&self) -> RefCell<StreamId> {
3518        self.inner().last_stream_id.clone()
3519    }
3520
3521    fn root_scope(&self) -> Scope {
3522        self.inner().root_scope
3523    }
3524
3525    fn node_id(&self) -> NodeId {
3526        self.inner().node_id
3527    }
3528
3529    fn global_node_id(&self) -> GlobalNodeId {
3530        self.inner().global_node_id.clone()
3531    }
3532
3533    fn check_fixedpoint(&self, scope: Scope) -> bool {
3534        self.inner().check_fixedpoint(scope)
3535    }
3536
3537    fn metadata_exchange(&self) -> &MetadataExchange {
3538        &self.inner().metadata_exchange
3539    }
3540
3541    fn balancer(&self) -> &Balancer {
3542        &self.inner().balancer
3543    }
3544
3545    fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError> {
3546        self.inner().balancer.set_auto_rebalance(enable)
3547    }
3548
3549    fn set_balancer_hint_by_global_id(
3550        &self,
3551        global_node_id: &GlobalNodeId,
3552        hint: BalancerHint,
3553    ) -> Result<(), DbspError> {
3554        if global_node_id.parent_id() != Some(GlobalNodeId::root()) {
3555            return Err(DbspError::Balancer(BalancerError::NonTopLevelNode(
3556                global_node_id.clone(),
3557            )));
3558        }
3559
3560        self.inner()
3561            .balancer
3562            .set_hint(global_node_id.local_node_id().unwrap(), hint)
3563    }
3564
3565    fn set_balancer_hint(&self, persistent_id: &str, hint: BalancerHint) -> Result<(), DbspError> {
3566        let global_node_id = self.lookup_local_node_by_persistent_id(persistent_id)?;
3567        self.set_balancer_hint_by_global_id(&global_node_id, hint)
3568    }
3569
3570    fn get_current_balancer_policies(&self) -> BTreeMap<NodeId, PartitioningPolicy> {
3571        self.inner().balancer.get_policy()
3572    }
3573
3574    fn get_current_balancer_policy(
3575        &self,
3576        persistent_id: &str,
3577    ) -> Result<PartitioningPolicy, DbspError> {
3578        let global_node_id = self.lookup_local_node_by_persistent_id(persistent_id)?;
3579        let local_node_id = global_node_id.local_node_id().unwrap();
3580        self.inner()
3581            .balancer
3582            .get_policy_for_stream(local_node_id)
3583            .ok_or_else(|| {
3584                DbspError::Balancer(BalancerError::NotRegisteredWithBalancer(local_node_id))
3585            })
3586    }
3587
3588    fn rebalance(&self) {
3589        self.inner().balancer.rebalance()
3590    }
3591
3592    fn start_compaction(&self) {
3593        let _ = self.map_local_nodes_mut(&mut |node| {
3594            node.start_compaction();
3595            Ok(())
3596        });
3597    }
3598}
3599
3600impl<P, T> Circuit for ChildCircuit<P, T>
3601where
3602    P: Clone + 'static,
3603    T: Timestamp,
3604{
3605    type Parent = P;
3606
3607    fn parent(&self) -> P {
3608        self.inner().parent.clone()
3609    }
3610
3611    fn root_circuit(&self) -> RootCircuit {
3612        if <dyn Any>::is::<RootCircuit>(self) {
3613            unsafe { transmute::<&Self, &RootCircuit>(self) }.clone()
3614        } else {
3615            self.inner().root.as_ref().unwrap().clone()
3616        }
3617    }
3618
3619    fn map_node<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> V) -> V {
3620        let path = id.path();
3621        let mut result: Option<V> = None;
3622
3623        assert!(path.starts_with(self.global_id().path()));
3624
3625        self.map_node_relative(
3626            path.strip_prefix(self.global_id().path()).unwrap(),
3627            &mut |node| result = Some(f(node)),
3628        );
3629        result.unwrap()
3630    }
3631
3632    fn map_node_mut<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3633        let path = id.path();
3634        let mut result: Option<V> = None;
3635
3636        assert!(path.starts_with(self.global_id().path()));
3637
3638        self.map_node_mut_relative(
3639            path.strip_prefix(self.global_id().path()).unwrap(),
3640            &mut |node| result = Some(f(node)),
3641        );
3642        result.unwrap()
3643    }
3644
3645    fn map_local_node_mut<V>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3646        let mut result: Option<V> = None;
3647
3648        self.map_node_mut_relative(&[id], &mut |node| result = Some(f(node)));
3649        result.unwrap()
3650    }
3651
3652    fn ptr_eq(this: &Self, other: &Self) -> bool {
3653        Rc::ptr_eq(&this.inner, &other.inner)
3654    }
3655
3656    fn circuit_event_handlers(&self) -> CircuitEventHandlers {
3657        self.inner().circuit_event_handlers.clone()
3658    }
3659
3660    fn scheduler_event_handlers(&self) -> SchedulerEventHandlers {
3661        self.inner().scheduler_event_handlers.clone()
3662    }
3663
3664    fn log_circuit_event(&self, event: &CircuitEvent) {
3665        self.inner().log_circuit_event(event);
3666    }
3667
3668    fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3669        self.inner().log_scheduler_event(event);
3670    }
3671
3672    fn cache_get_or_insert_with<K, F>(&self, key: K, mut f: F) -> RefMut<'_, K::Value>
3673    where
3674        K: 'static + TypedMapKey<CircuitStoreMarker>,
3675        F: FnMut() -> K::Value,
3676    {
3677        // Don't use `store.entry()`, since `f` may need to perform
3678        // its own cache lookup.
3679        if self.inner().store.borrow().contains_key(&key) {
3680            return RefMut::map(self.inner().store.borrow_mut(), |store| {
3681                store.get_mut(&key).unwrap()
3682            });
3683        }
3684
3685        let new = f();
3686
3687        // TODO: Use `RefMut::filter_map()` to only perform one lookup in the happy path
3688        //       https://github.com/rust-lang/rust/issues/81061
3689        RefMut::map(self.inner().store.borrow_mut(), |store| {
3690            store.entry(key).or_insert(new)
3691        })
3692    }
3693
3694    fn connect_stream<V: 'static>(
3695        &self,
3696        stream: &Stream<Self, V>,
3697        to: NodeId,
3698        ownership_preference: OwnershipPreference,
3699    ) {
3700        self.log_circuit_event(&CircuitEvent::stream(
3701            stream.origin_node_id().clone(),
3702            self.global_node_id().child(to),
3703            ownership_preference,
3704        ));
3705
3706        debug_assert_eq!(self.global_node_id(), stream.circuit.global_node_id());
3707        self.inner().add_edge(Edge {
3708            from: stream.local_node_id(),
3709            to,
3710            origin: stream.origin_node_id().clone(),
3711            stream: Some(Box::new(stream.clone())),
3712            ownership_preference: Some(ownership_preference),
3713        });
3714    }
3715
3716    fn tick(&self) {
3717        let mut time = self.time.borrow_mut();
3718        *time = time.advance(0);
3719    }
3720
3721    fn clock_start(&self, scope: Scope) {
3722        for node in self.inner().nodes.borrow_mut().iter_mut() {
3723            node.borrow_mut().clock_start(scope);
3724        }
3725    }
3726
3727    fn clock_end(&self, scope: Scope) {
3728        for node in self.inner().nodes.borrow_mut().iter_mut() {
3729            node.borrow_mut().clock_end(scope);
3730        }
3731
3732        let mut time = self.time.borrow_mut();
3733        *time = time.advance(scope + 1);
3734    }
3735
3736    fn ready(&self, id: NodeId) -> bool {
3737        self.inner().nodes.borrow()[id.0].borrow().ready()
3738    }
3739
3740    fn cache_insert<K>(&self, key: K, val: K::Value)
3741    where
3742        K: TypedMapKey<CircuitStoreMarker> + 'static,
3743    {
3744        self.inner().store.borrow_mut().insert(key, val);
3745    }
3746
3747    fn cache_contains<K>(&self, key: &K) -> bool
3748    where
3749        K: TypedMapKey<CircuitStoreMarker> + 'static,
3750    {
3751        self.inner().store.borrow().contains_key(key)
3752    }
3753
3754    fn cache_get<K>(&self, key: &K) -> Option<K::Value>
3755    where
3756        K: TypedMapKey<CircuitStoreMarker> + 'static,
3757        K::Value: Clone,
3758    {
3759        self.inner().store.borrow().get(key).cloned()
3760    }
3761
3762    fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>) {
3763        self.inner().nodes.borrow()[id.0]
3764            .borrow_mut()
3765            .register_ready_callback(cb);
3766    }
3767
3768    fn is_async_node(&self, id: NodeId) -> bool {
3769        self.inner().nodes.borrow()[id.0].borrow().is_async()
3770    }
3771
3772    // Justification: the scheduler must not call `eval()` on a node twice.
3773    #[allow(clippy::await_holding_refcell_ref)]
3774    async fn eval_node(&self, id: NodeId) -> Result<Option<Position>, SchedulerError> {
3775        let circuit = self.inner();
3776        debug_assert!(id.0 < circuit.nodes.borrow().len());
3777
3778        // Notify loggers while holding a reference to the inner circuit.
3779        // We normally avoid this, since a nested call from event handler
3780        // will panic in `self.inner()`, but we do it here as an
3781        // optimization.
3782        circuit.log_scheduler_event(&SchedulerEvent::eval_start(
3783            circuit.nodes.borrow()[id.0].borrow().as_ref(),
3784        ));
3785
3786        let span = Span::new("eval")
3787            .with_category("Operator")
3788            .with_tooltip(|| {
3789                let nodes = circuit.nodes.borrow();
3790                let node = nodes[id.0].borrow();
3791                format!("{} {}", node.name(), node.global_id().node_identifier())
3792            });
3793        let (result, duration) = Timed::new(circuit.nodes.borrow()[id.0].borrow_mut().eval()).await;
3794        let progress = result?;
3795        span.record();
3796
3797        circuit.log_scheduler_event(&SchedulerEvent::eval_end(
3798            circuit.nodes.borrow()[id.0].borrow().as_ref(),
3799            duration,
3800        ));
3801
3802        Ok(progress)
3803    }
3804
3805    // Justification: the scheduler must not call `eval()` on a node twice.
3806    #[allow(clippy::await_holding_refcell_ref)]
3807    async fn eval_import_node(&self, id: NodeId) {
3808        let circuit = self.inner();
3809        debug_assert!(id.0 < circuit.nodes.borrow().len());
3810        debug_assert!(circuit.import_nodes().contains(&id));
3811
3812        // `circuit.nodes.borrow()` is safe across the await point because the
3813        // scheduler only borrows `circuit.nodes` during a step and never
3814        // borrows it mutably.
3815        //
3816        // `circuit.nodes.borrow()[id.0].borrow_mut()` is safe across the await
3817        // point because the scheduler never evaluates a node reentrantly.
3818        circuit.nodes.borrow()[id.0].borrow_mut().import().await;
3819    }
3820
3821    fn flush_node(&self, id: NodeId) {
3822        let circuit = self.inner();
3823        debug_assert!(id.0 < circuit.nodes.borrow().len());
3824
3825        circuit.nodes.borrow()[id.0].borrow_mut().flush();
3826    }
3827
3828    fn is_flush_complete(&self, id: NodeId) -> bool {
3829        let circuit = self.inner();
3830        debug_assert!(id.0 < circuit.nodes.borrow().len());
3831
3832        circuit.nodes.borrow()[id.0].borrow().is_flush_complete()
3833    }
3834
3835    #[track_caller]
3836    fn region<F, V>(&self, name: &str, f: F) -> V
3837    where
3838        F: FnOnce() -> V,
3839    {
3840        self.log_circuit_event(&CircuitEvent::push_region(name, Some(Location::caller())));
3841        let res = f();
3842        self.log_circuit_event(&CircuitEvent::pop_region());
3843        res
3844    }
3845
3846    #[track_caller]
3847    fn open_region(&self, name: RegionName) {
3848        self.log_circuit_event(&CircuitEvent::open_region(name, Some(Location::caller())));
3849    }
3850
3851    fn close_region(&self, name: RegionName) {
3852        self.log_circuit_event(&CircuitEvent::close_region(name));
3853    }
3854
3855    fn add_preprocessor(&self, preprocessor_node_id: NodeId) {
3856        for node in self.inner().nodes.borrow_mut().iter() {
3857            if node.borrow().is_input() {
3858                self.add_dependency(preprocessor_node_id, node.borrow().local_id());
3859            }
3860        }
3861    }
3862
3863    /// Add a source operator to the circuit.  See [`SourceOperator`].
3864    fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
3865    where
3866        O: Data,
3867        Op: SourceOperator<O>,
3868    {
3869        self.add_node(|id| {
3870            self.log_circuit_event(&CircuitEvent::operator(
3871                GlobalNodeId::child_of(self, id),
3872                operator.name(),
3873                operator.location(),
3874            ));
3875
3876            let node = SourceNode::new(operator, self.clone(), id);
3877            let output_stream = node.output_stream();
3878            (node, output_stream)
3879        })
3880    }
3881
3882    fn add_exchange<I, SndOp, O, RcvOp>(
3883        &self,
3884        sender: SndOp,
3885        receiver: RcvOp,
3886        input_stream: &Stream<Self, I>,
3887    ) -> Stream<Self, O>
3888    where
3889        I: Data,
3890        O: Data,
3891        SndOp: SinkOperator<I>,
3892        RcvOp: SourceOperator<O>,
3893    {
3894        let preference = sender.input_preference();
3895        self.add_exchange_with_preference(sender, receiver, input_stream, preference)
3896    }
3897
3898    fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
3899        &self,
3900        sender: SndOp,
3901        receiver: RcvOp,
3902        input_stream: &Stream<Self, I>,
3903        input_preference: OwnershipPreference,
3904    ) -> Stream<Self, O>
3905    where
3906        I: Data,
3907        O: Data,
3908        SndOp: SinkOperator<I>,
3909        RcvOp: SourceOperator<O>,
3910    {
3911        let sender_id = self.add_node(|id| {
3912            self.log_circuit_event(&CircuitEvent::operator(
3913                GlobalNodeId::child_of(self, id),
3914                sender.name(),
3915                sender.location(),
3916            ));
3917
3918            let node = SinkNode::new(sender, input_stream.clone(), self.clone(), id);
3919            self.connect_stream(input_stream, id, input_preference);
3920            (node, id)
3921        });
3922
3923        let output_stream = self.add_node(|id| {
3924            self.log_circuit_event(&CircuitEvent::operator(
3925                GlobalNodeId::child_of(self, id),
3926                receiver.name(),
3927                receiver.location(),
3928            ));
3929
3930            let node = SourceNode::new(receiver, self.clone(), id);
3931            let output_stream = node.output_stream();
3932            (node, output_stream)
3933        });
3934
3935        self.add_dependency(sender_id, output_stream.local_node_id());
3936        output_stream
3937    }
3938
3939    fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
3940    where
3941        I: Data,
3942        Op: SinkOperator<I>,
3943    {
3944        let preference = operator.input_preference();
3945        self.add_sink_with_preference(operator, input_stream, preference)
3946    }
3947
3948    fn add_sink_with_preference<I, Op>(
3949        &self,
3950        operator: Op,
3951        input_stream: &Stream<Self, I>,
3952        input_preference: OwnershipPreference,
3953    ) -> GlobalNodeId
3954    where
3955        I: Data,
3956        Op: SinkOperator<I>,
3957    {
3958        self.add_node(|id| {
3959            let global_node_id = GlobalNodeId::child_of(self, id);
3960            // Log the operator event before the connection event, so that handlers
3961            // don't observe edges that connect to nodes they haven't seen yet.
3962            self.log_circuit_event(&CircuitEvent::operator(
3963                global_node_id.clone(),
3964                operator.name(),
3965                operator.location(),
3966            ));
3967
3968            self.connect_stream(input_stream, id, input_preference);
3969            (
3970                SinkNode::new(operator, input_stream.clone(), self.clone(), id),
3971                global_node_id,
3972            )
3973        })
3974    }
3975
3976    /// Add a binary sink operator (see [`BinarySinkOperator`]).
3977    fn add_binary_sink<I1, I2, Op>(
3978        &self,
3979        operator: Op,
3980        input_stream1: &Stream<Self, I1>,
3981        input_stream2: &Stream<Self, I2>,
3982    ) where
3983        I1: Data,
3984        I2: Data,
3985        Op: BinarySinkOperator<I1, I2>,
3986    {
3987        let (preference1, preference2) = operator.input_preference();
3988        self.add_binary_sink_with_preference(
3989            operator,
3990            (input_stream1, preference1),
3991            (input_stream2, preference2),
3992        )
3993    }
3994
3995    fn add_binary_sink_with_preference<I1, I2, Op>(
3996        &self,
3997        operator: Op,
3998        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3999        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4000    ) where
4001        I1: Data,
4002        I2: Data,
4003        Op: BinarySinkOperator<I1, I2>,
4004    {
4005        let (input_stream1, input_preference1) = input_stream1;
4006        let (input_stream2, input_preference2) = input_stream2;
4007
4008        self.add_node(|id| {
4009            self.log_circuit_event(&CircuitEvent::operator(
4010                GlobalNodeId::child_of(self, id),
4011                operator.name(),
4012                operator.location(),
4013            ));
4014
4015            let node = BinarySinkNode::new(
4016                operator,
4017                input_stream1.clone(),
4018                input_stream2.clone(),
4019                self.clone(),
4020                id,
4021            );
4022            self.connect_stream(input_stream1, id, input_preference1);
4023            self.connect_stream(input_stream2, id, input_preference2);
4024            (node, ())
4025        });
4026    }
4027
4028    /// Add a ternary sink operator (see [`TernarySinkOperator`]).
4029    fn add_ternary_sink<I1, I2, I3, Op>(
4030        &self,
4031        operator: Op,
4032        input_stream1: &Stream<Self, I1>,
4033        input_stream2: &Stream<Self, I2>,
4034        input_stream3: &Stream<Self, I3>,
4035    ) -> GlobalNodeId
4036    where
4037        I1: Data,
4038        I2: Data,
4039        I3: Data,
4040        Op: TernarySinkOperator<I1, I2, I3>,
4041    {
4042        let (preference1, preference2, preference3) = operator.input_preference();
4043        self.add_ternary_sink_with_preference(
4044            operator,
4045            (input_stream1, preference1),
4046            (input_stream2, preference2),
4047            (input_stream3, preference3),
4048        )
4049    }
4050
4051    fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
4052        &self,
4053        operator: Op,
4054        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4055        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4056        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4057    ) -> GlobalNodeId
4058    where
4059        I1: Data,
4060        I2: Data,
4061        I3: Data,
4062        Op: TernarySinkOperator<I1, I2, I3>,
4063    {
4064        let (input_stream1, input_preference1) = input_stream1;
4065        let (input_stream2, input_preference2) = input_stream2;
4066        let (input_stream3, input_preference3) = input_stream3;
4067
4068        self.add_node(|id| {
4069            let global_node_id = GlobalNodeId::child_of(self, id);
4070
4071            self.log_circuit_event(&CircuitEvent::operator(
4072                GlobalNodeId::child_of(self, id),
4073                operator.name(),
4074                operator.location(),
4075            ));
4076
4077            let node = TernarySinkNode::new(
4078                operator,
4079                input_stream1.clone(),
4080                input_stream2.clone(),
4081                input_stream3.clone(),
4082                self.clone(),
4083                id,
4084            );
4085            self.connect_stream(input_stream1, id, input_preference1);
4086            self.connect_stream(input_stream2, id, input_preference2);
4087            self.connect_stream(input_stream3, id, input_preference3);
4088            (node, global_node_id)
4089        })
4090    }
4091
4092    fn add_unary_operator<I, O, Op>(
4093        &self,
4094        operator: Op,
4095        input_stream: &Stream<Self, I>,
4096    ) -> Stream<Self, O>
4097    where
4098        I: Data,
4099        O: Data,
4100        Op: UnaryOperator<I, O>,
4101    {
4102        let preference = operator.input_preference();
4103        self.add_unary_operator_with_preference(operator, input_stream, preference)
4104    }
4105
4106    fn add_unary_operator_with_preference<I, O, Op>(
4107        &self,
4108        operator: Op,
4109        input_stream: &Stream<Self, I>,
4110        input_preference: OwnershipPreference,
4111    ) -> Stream<Self, O>
4112    where
4113        I: Data,
4114        O: Data,
4115        Op: UnaryOperator<I, O>,
4116    {
4117        self.add_node(|id| {
4118            self.log_circuit_event(&CircuitEvent::operator(
4119                GlobalNodeId::child_of(self, id),
4120                operator.name(),
4121                operator.location(),
4122            ));
4123
4124            let node = UnaryNode::new(operator, input_stream.clone(), self.clone(), id);
4125            let output_stream = node.output_stream();
4126            self.connect_stream(input_stream, id, input_preference);
4127            (node, output_stream)
4128        })
4129    }
4130
4131    fn add_binary_operator<I1, I2, O, Op>(
4132        &self,
4133        operator: Op,
4134        input_stream1: &Stream<Self, I1>,
4135        input_stream2: &Stream<Self, I2>,
4136    ) -> Stream<Self, O>
4137    where
4138        I1: Data,
4139        I2: Data,
4140        O: Data,
4141        Op: BinaryOperator<I1, I2, O>,
4142    {
4143        let (pref1, pref2) = operator.input_preference();
4144        self.add_binary_operator_with_preference(
4145            operator,
4146            (input_stream1, pref1),
4147            (input_stream2, pref2),
4148        )
4149    }
4150
4151    fn add_binary_operator_with_preference<I1, I2, O, Op>(
4152        &self,
4153        operator: Op,
4154        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4155        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4156    ) -> Stream<Self, O>
4157    where
4158        I1: Data,
4159        I2: Data,
4160        O: Data,
4161        Op: BinaryOperator<I1, I2, O>,
4162    {
4163        let (input_stream1, input_preference1) = input_stream1;
4164        let (input_stream2, input_preference2) = input_stream2;
4165
4166        self.add_node(|id| {
4167            self.log_circuit_event(&CircuitEvent::operator(
4168                GlobalNodeId::child_of(self, id),
4169                operator.name(),
4170                operator.location(),
4171            ));
4172
4173            let node = BinaryNode::new(
4174                operator,
4175                input_stream1.clone(),
4176                input_stream2.clone(),
4177                self.clone(),
4178                id,
4179            );
4180            let output_stream = node.output_stream();
4181            self.connect_stream(input_stream1, id, input_preference1);
4182            self.connect_stream(input_stream2, id, input_preference2);
4183            (node, output_stream)
4184        })
4185    }
4186
4187    fn add_ternary_operator<I1, I2, I3, O, Op>(
4188        &self,
4189        operator: Op,
4190        input_stream1: &Stream<Self, I1>,
4191        input_stream2: &Stream<Self, I2>,
4192        input_stream3: &Stream<Self, I3>,
4193    ) -> Stream<Self, O>
4194    where
4195        I1: Data,
4196        I2: Data,
4197        I3: Data,
4198        O: Data,
4199        Op: TernaryOperator<I1, I2, I3, O>,
4200    {
4201        let (pref1, pref2, pref3) = operator.input_preference();
4202        self.add_ternary_operator_with_preference(
4203            operator,
4204            (input_stream1, pref1),
4205            (input_stream2, pref2),
4206            (input_stream3, pref3),
4207        )
4208    }
4209
4210    #[allow(clippy::too_many_arguments)]
4211    fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
4212        &self,
4213        operator: Op,
4214        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4215        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4216        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4217    ) -> Stream<Self, O>
4218    where
4219        I1: Data,
4220        I2: Data,
4221        I3: Data,
4222        O: Data,
4223        Op: TernaryOperator<I1, I2, I3, O>,
4224    {
4225        let (input_stream1, input_preference1) = input_stream1;
4226        let (input_stream2, input_preference2) = input_stream2;
4227        let (input_stream3, input_preference3) = input_stream3;
4228
4229        self.add_node(|id| {
4230            self.log_circuit_event(&CircuitEvent::operator(
4231                GlobalNodeId::child_of(self, id),
4232                operator.name(),
4233                operator.location(),
4234            ));
4235
4236            let node = TernaryNode::new(
4237                operator,
4238                input_stream1.clone(),
4239                input_stream2.clone(),
4240                input_stream3.clone(),
4241                self.clone(),
4242                id,
4243            );
4244            let output_stream = node.output_stream();
4245            self.connect_stream(input_stream1, id, input_preference1);
4246            self.connect_stream(input_stream2, id, input_preference2);
4247            self.connect_stream(input_stream3, id, input_preference3);
4248            (node, output_stream)
4249        })
4250    }
4251
4252    fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
4253        &self,
4254        operator: Op,
4255        input_stream1: &Stream<Self, I1>,
4256        input_stream2: &Stream<Self, I2>,
4257        input_stream3: &Stream<Self, I3>,
4258        input_stream4: &Stream<Self, I4>,
4259    ) -> Stream<Self, O>
4260    where
4261        I1: Data,
4262        I2: Data,
4263        I3: Data,
4264        I4: Data,
4265        O: Data,
4266        Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4267    {
4268        let (pref1, pref2, pref3, pref4) = operator.input_preference();
4269        self.add_quaternary_operator_with_preference(
4270            operator,
4271            (input_stream1, pref1),
4272            (input_stream2, pref2),
4273            (input_stream3, pref3),
4274            (input_stream4, pref4),
4275        )
4276    }
4277
4278    #[allow(clippy::too_many_arguments)]
4279    fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
4280        &self,
4281        operator: Op,
4282        input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4283        input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4284        input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4285        input_stream4: (&Stream<Self, I4>, OwnershipPreference),
4286    ) -> Stream<Self, O>
4287    where
4288        I1: Data,
4289        I2: Data,
4290        I3: Data,
4291        I4: Data,
4292        O: Data,
4293        Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4294    {
4295        let (input_stream1, input_preference1) = input_stream1;
4296        let (input_stream2, input_preference2) = input_stream2;
4297        let (input_stream3, input_preference3) = input_stream3;
4298        let (input_stream4, input_preference4) = input_stream4;
4299
4300        self.add_node(|id| {
4301            self.log_circuit_event(&CircuitEvent::operator(
4302                GlobalNodeId::child_of(self, id),
4303                operator.name(),
4304                operator.location(),
4305            ));
4306
4307            let node = QuaternaryNode::new(
4308                operator,
4309                input_stream1.clone(),
4310                input_stream2.clone(),
4311                input_stream3.clone(),
4312                input_stream4.clone(),
4313                self.clone(),
4314                id,
4315            );
4316            let output_stream = node.output_stream();
4317            self.connect_stream(input_stream1, id, input_preference1);
4318            self.connect_stream(input_stream2, id, input_preference2);
4319            self.connect_stream(input_stream3, id, input_preference3);
4320            self.connect_stream(input_stream4, id, input_preference4);
4321            (node, output_stream)
4322        })
4323    }
4324
4325    fn add_nary_operator<'a, I, O, Op, Iter>(
4326        &'a self,
4327        operator: Op,
4328        input_streams: Iter,
4329    ) -> Stream<Self, O>
4330    where
4331        I: Data,
4332        O: Data,
4333        Op: NaryOperator<I, O>,
4334        Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4335    {
4336        let pref = operator.input_preference();
4337        self.add_nary_operator_with_preference(operator, input_streams, pref)
4338    }
4339
4340    fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
4341        &'a self,
4342        operator: Op,
4343        input_streams: Iter,
4344        input_preference: OwnershipPreference,
4345    ) -> Stream<Self, O>
4346    where
4347        I: Data,
4348        O: Data,
4349        Op: NaryOperator<I, O>,
4350        Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4351    {
4352        let input_streams: Vec<Stream<_, _>> = input_streams.into_iter().cloned().collect();
4353        self.add_node(|id| {
4354            self.log_circuit_event(&CircuitEvent::operator(
4355                GlobalNodeId::child_of(self, id),
4356                operator.name(),
4357                operator.location(),
4358            ));
4359
4360            let node = NaryNode::new(operator, input_streams.clone(), self.clone(), id);
4361            let output_stream = node.output_stream();
4362            for stream in input_streams.iter() {
4363                self.connect_stream(stream, id, input_preference);
4364            }
4365            (node, output_stream)
4366        })
4367    }
4368
4369    #[track_caller]
4370    fn add_custom_node<N: Node, R>(
4371        &self,
4372        name: Cow<'static, str>,
4373        constructor: impl FnOnce(NodeId) -> (N, R),
4374    ) -> R {
4375        self.add_node(|id| {
4376            // This must be called before the node is created, so that the node can connect input streams to
4377            // itself without causing a panic.
4378            self.log_circuit_event(&CircuitEvent::operator(
4379                GlobalNodeId::child_of(self, id),
4380                name,
4381                Some(Location::caller()),
4382            ));
4383            let (node, res) = constructor(id);
4384            (node, res)
4385        })
4386    }
4387
4388    fn add_feedback<I, O, Op>(
4389        &self,
4390        operator: Op,
4391    ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4392    where
4393        I: Data,
4394        O: Data,
4395        Op: StrictUnaryOperator<I, O>,
4396    {
4397        self.add_node(|id| {
4398            self.log_circuit_event(&CircuitEvent::strict_operator_output(
4399                GlobalNodeId::child_of(self, id),
4400                operator.name(),
4401                operator.location(),
4402            ));
4403
4404            let operator = Rc::new(RefCell::new(operator));
4405            let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4406            let output_node = FeedbackOutputNode::new(operator, self.clone(), id);
4407            let local = output_node.output_stream();
4408            (output_node, (local, connector))
4409        })
4410    }
4411
4412    fn add_feedback_with_export<I, O, Op>(
4413        &self,
4414        operator: Op,
4415    ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4416    where
4417        I: Data,
4418        O: Data,
4419        Op: StrictUnaryOperator<I, O>,
4420    {
4421        self.add_node(|id| {
4422            self.log_circuit_event(&CircuitEvent::strict_operator_output(
4423                GlobalNodeId::child_of(self, id),
4424                operator.name(),
4425                operator.location(),
4426            ));
4427
4428            let operator = Rc::new(RefCell::new(operator));
4429            let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4430            let output_node = FeedbackOutputNode::with_export(operator, self.clone(), id);
4431            let local = output_node.output_stream();
4432            let export = output_node.export_stream.clone().unwrap();
4433            (output_node, (ExportStream { local, export }, connector))
4434        })
4435    }
4436
4437    /// Connect feedback loop.
4438    ///
4439    /// Returns node id of the input half of Z-1.
4440    fn connect_feedback_with_preference<I, O, Op>(
4441        &self,
4442        output_node_id: NodeId,
4443        operator: Rc<RefCell<Op>>,
4444        input_stream: &Stream<Self, I>,
4445        input_preference: OwnershipPreference,
4446    ) where
4447        I: Data,
4448        O: Data,
4449        Op: StrictUnaryOperator<I, O>,
4450    {
4451        self.add_node(|id| {
4452            self.log_circuit_event(&CircuitEvent::strict_operator_input(
4453                GlobalNodeId::child_of(self, id),
4454                output_node_id,
4455            ));
4456
4457            let output_node = FeedbackInputNode::new(operator, input_stream.clone(), id);
4458            self.connect_stream(input_stream, id, input_preference);
4459            self.add_dependency(output_node_id, id);
4460            (output_node, ())
4461        })
4462    }
4463
4464    fn iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4465    where
4466        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4467        E: Executor<IterativeCircuit<Self>>,
4468    {
4469        self.try_add_node(|id| {
4470            let global_id = GlobalNodeId::child_of(self, id);
4471            self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), true));
4472            let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4473            let (res, executor) = child_constructor(&mut child_circuit)?;
4474            let child = <ChildNode<IterativeCircuit<Self>>>::new::<E>(child_circuit, 1, executor);
4475            self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4476            Ok((child, res))
4477        })
4478    }
4479
4480    fn non_iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4481    where
4482        F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4483        E: Executor<NonIterativeCircuit<Self>>,
4484    {
4485        self.try_add_node(|id| {
4486            let global_id = GlobalNodeId::child_of(self, id);
4487            self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), false));
4488            let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4489            let (res, executor) = child_constructor(&mut child_circuit)?;
4490            let child =
4491                <ChildNode<NonIterativeCircuit<Self>>>::new::<E>(child_circuit, 0, executor);
4492            self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4493            Ok((child, res))
4494        })
4495    }
4496
4497    fn iterate<F, C, V>(&self, constructor: F) -> Result<V, SchedulerError>
4498    where
4499        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4500        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4501    {
4502        self.iterate_with_scheduler::<F, C, V, DynamicScheduler>(constructor)
4503    }
4504
4505    /// Add an iteratively scheduled child circuit.
4506    ///
4507    /// Similar to [`iterate`](`Self::iterate`), but with a user-specified
4508    /// [`Scheduler`] implementation.
4509    fn iterate_with_scheduler<F, C, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4510    where
4511        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4512        C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4513        S: Scheduler + 'static,
4514    {
4515        self.iterative_subcircuit(|child| {
4516            let (termination_check, res) = constructor(child)?;
4517            let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4518            executor.prepare(child, None)?;
4519            Ok((res, executor))
4520        })
4521    }
4522
4523    fn fixedpoint<F, V>(&self, constructor: F) -> Result<V, SchedulerError>
4524    where
4525        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4526    {
4527        self.fixedpoint_with_scheduler::<F, V, DynamicScheduler>(constructor)
4528    }
4529
4530    fn fixedpoint_with_scheduler<F, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4531    where
4532        F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4533        S: Scheduler + 'static,
4534    {
4535        self.iterative_subcircuit(|child| {
4536            let res = constructor(child)?;
4537            let child_clone = child.clone();
4538
4539            let consensus = Consensus::new();
4540
4541            let termination_check = async move || {
4542                // Send local fixed point status to all peers.
4543                let local_fixedpoint = child_clone.inner().check_fixedpoint(0);
4544                consensus.check(local_fixedpoint).await
4545            };
4546            let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4547            executor.prepare(child, None)?;
4548            Ok((res, executor))
4549        })
4550    }
4551
4552    fn import_stream<I, O, Op>(&self, operator: Op, parent_stream: &Stream<P, I>) -> Stream<Self, O>
4553    where
4554        Self::Parent: Circuit,
4555        I: Data,
4556        O: Data,
4557        Op: ImportOperator<I, O>,
4558    {
4559        let preference = operator.input_preference();
4560        self.import_stream_with_preference(operator, parent_stream, preference)
4561    }
4562
4563    fn import_stream_with_preference<I, O, Op>(
4564        &self,
4565        operator: Op,
4566        parent_stream: &Stream<P, I>,
4567        input_preference: OwnershipPreference,
4568    ) -> Stream<Self, O>
4569    where
4570        Self::Parent: Circuit,
4571        I: Data,
4572        O: Data,
4573        Op: ImportOperator<I, O>,
4574    {
4575        assert!(self.is_child_of(parent_stream.circuit()));
4576
4577        let output_stream = self.add_node(|id| {
4578            let node_id = self.global_node_id().child(id);
4579            self.log_circuit_event(&CircuitEvent::operator(
4580                node_id.clone(),
4581                operator.name(),
4582                operator.location(),
4583            ));
4584            let node = ImportNode::new(operator, self.clone(), parent_stream.clone(), id);
4585            // Note: here the edge points to the sub-circuit, and not to the ImportNode itself.
4586            self.parent()
4587                .connect_stream(parent_stream, self.node_id(), input_preference);
4588            // Log the actual edge going to the inner node as well
4589            self.parent().log_circuit_event(&CircuitEvent::stream(
4590                parent_stream.origin_node_id().clone(),
4591                node_id.clone(),
4592                input_preference,
4593            ));
4594            let output_stream = node.output_stream();
4595            (node, output_stream)
4596        });
4597
4598        self.add_import_node(output_stream.local_node_id());
4599
4600        output_stream
4601    }
4602
4603    fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata) {
4604        let mut edges = self.edges_mut();
4605        let mut new_edges = Vec::new();
4606
4607        let Some(edges_to_replay) = edges.get_by_stream_id(&Some(stream_id)) else {
4608            return;
4609        };
4610
4611        for edge in edges_to_replay {
4612            // println!(
4613            //     "Adding replay edge ({}) {} -> {}",
4614            //     replay_stream.origin_node_id(),
4615            //     replay_stream.local_node_id(),
4616            //     edge.to
4617            // );
4618            new_edges.push(Edge {
4619                from: replay_stream.local_node_id(),
4620                to: edge.to,
4621                origin: replay_stream.origin_node_id().clone(),
4622                stream: Some(clone_box(replay_stream)),
4623                ownership_preference: edge.ownership_preference,
4624            });
4625        }
4626
4627        edges.extend(new_edges);
4628    }
4629}
4630struct ImportNode<C, I, O, Op>
4631where
4632    C: Circuit,
4633{
4634    id: GlobalNodeId,
4635    operator: Op,
4636    parent_stream: Stream<C::Parent, I>,
4637    output_stream: Stream<C, O>,
4638    labels: BTreeMap<String, String>,
4639}
4640
4641impl<C, I, O, Op> ImportNode<C, I, O, Op>
4642where
4643    C: Circuit,
4644    C::Parent: Circuit,
4645    I: Clone + 'static,
4646    O: Clone + 'static,
4647    Op: ImportOperator<I, O>,
4648{
4649    fn new(operator: Op, circuit: C, parent_stream: Stream<C::Parent, I>, id: NodeId) -> Self {
4650        assert!(Circuit::ptr_eq(&circuit.parent(), parent_stream.circuit()));
4651
4652        Self {
4653            id: circuit.global_node_id().child(id),
4654            operator,
4655            parent_stream,
4656            output_stream: Stream::new(circuit, id),
4657            labels: BTreeMap::new(),
4658        }
4659    }
4660
4661    fn output_stream(&self) -> Stream<C, O> {
4662        self.output_stream.clone()
4663    }
4664}
4665
4666impl<C, I, O, Op> Node for ImportNode<C, I, O, Op>
4667where
4668    C: Circuit,
4669    C::Parent: Circuit,
4670    I: Clone + 'static,
4671    O: Clone + 'static,
4672    Op: ImportOperator<I, O>,
4673{
4674    fn name(&self) -> Cow<'static, str> {
4675        self.operator.name()
4676    }
4677
4678    fn local_id(&self) -> NodeId {
4679        self.id.local_node_id().unwrap()
4680    }
4681
4682    fn global_id(&self) -> &GlobalNodeId {
4683        &self.id
4684    }
4685
4686    fn is_async(&self) -> bool {
4687        self.operator.is_async()
4688    }
4689
4690    fn is_input(&self) -> bool {
4691        self.operator.is_input()
4692    }
4693
4694    fn ready(&self) -> bool {
4695        self.operator.ready()
4696    }
4697
4698    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4699        self.operator.register_ready_callback(cb);
4700    }
4701
4702    fn eval<'a>(
4703        &'a mut self,
4704    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4705        Box::pin(async {
4706            self.output_stream.put(self.operator.eval().await);
4707            Ok(self.operator.flush_progress())
4708        })
4709    }
4710
4711    // It is safe to hold `self.parent_stream.get()` across the await point
4712    // because the operator is not evaluated reentrantly.
4713    #[allow(clippy::await_holding_refcell_ref)]
4714    fn import<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
4715        Box::pin(async {
4716            match StreamValue::take(self.parent_stream.val()) {
4717                None => {
4718                    self.operator
4719                        .import(StreamValue::peek(&self.parent_stream.get()))
4720                        .await
4721                }
4722                Some(val) => self.operator.import_owned(val).await,
4723            }
4724
4725            StreamValue::consume_token(self.parent_stream.val());
4726        })
4727    }
4728
4729    fn start_transaction(&mut self) {
4730        self.operator.start_transaction();
4731    }
4732
4733    fn flush(&mut self) {
4734        self.operator.flush();
4735    }
4736
4737    fn is_flush_complete(&self) -> bool {
4738        self.operator.is_flush_complete()
4739    }
4740
4741    fn clock_start(&mut self, scope: Scope) {
4742        self.operator.clock_start(scope);
4743    }
4744
4745    fn clock_end(&mut self, scope: Scope) {
4746        self.operator.clock_end(scope);
4747    }
4748
4749    fn init(&mut self) {
4750        self.operator.init(&self.id);
4751    }
4752
4753    fn metadata(&self, output: &mut OperatorMeta) {
4754        self.operator.metadata(output);
4755    }
4756
4757    fn fixedpoint(&self, scope: Scope) -> bool {
4758        self.operator.fixedpoint(scope)
4759    }
4760
4761    fn checkpoint(
4762        &mut self,
4763        base: &StoragePath,
4764        files: &mut Vec<Arc<dyn FileCommitter>>,
4765    ) -> Result<(), DbspError> {
4766        self.operator
4767            .checkpoint(base, self.persistent_id().as_deref(), files)
4768    }
4769
4770    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4771        self.operator.restore(base, self.persistent_id().as_deref())
4772    }
4773
4774    fn start_compaction(&mut self) {
4775        self.operator.start_compaction()
4776    }
4777
4778    fn clear_state(&mut self) -> Result<(), DbspError> {
4779        self.operator.clear_state()
4780    }
4781
4782    fn start_replay(&mut self) -> Result<(), DbspError> {
4783        self.operator.start_replay()
4784    }
4785
4786    fn end_replay(&mut self) -> Result<(), DbspError> {
4787        self.operator.end_replay()
4788    }
4789
4790    fn is_replay_complete(&self) -> bool {
4791        self.operator.is_replay_complete()
4792    }
4793
4794    fn set_label(&mut self, key: &str, value: &str) {
4795        self.labels.insert(key.to_string(), value.to_string());
4796    }
4797
4798    fn get_label(&self, key: &str) -> Option<&str> {
4799        self.labels.get(key).map(|s| s.as_str())
4800    }
4801
4802    fn labels(&self) -> &BTreeMap<String, String> {
4803        &self.labels
4804    }
4805
4806    fn as_any(&self) -> &dyn Any {
4807        self
4808    }
4809}
4810
4811struct SourceNode<C, O, Op> {
4812    id: GlobalNodeId,
4813    operator: Op,
4814    output_stream: Stream<C, O>,
4815    labels: BTreeMap<String, String>,
4816}
4817
4818impl<C, O, Op> SourceNode<C, O, Op>
4819where
4820    Op: SourceOperator<O>,
4821    C: Circuit,
4822{
4823    fn new(operator: Op, circuit: C, id: NodeId) -> Self {
4824        Self {
4825            id: circuit.global_node_id().child(id),
4826            operator,
4827            output_stream: Stream::new(circuit, id),
4828            labels: BTreeMap::new(),
4829        }
4830    }
4831
4832    fn output_stream(&self) -> Stream<C, O> {
4833        self.output_stream.clone()
4834    }
4835}
4836
4837impl<C, O, Op> Node for SourceNode<C, O, Op>
4838where
4839    C: Circuit,
4840    O: Clone + 'static,
4841    Op: SourceOperator<O>,
4842{
4843    fn name(&self) -> Cow<'static, str> {
4844        self.operator.name()
4845    }
4846
4847    fn local_id(&self) -> NodeId {
4848        self.id.local_node_id().unwrap()
4849    }
4850
4851    fn global_id(&self) -> &GlobalNodeId {
4852        &self.id
4853    }
4854
4855    fn is_async(&self) -> bool {
4856        self.operator.is_async()
4857    }
4858
4859    fn is_input(&self) -> bool {
4860        self.operator.is_input()
4861    }
4862
4863    fn ready(&self) -> bool {
4864        self.operator.ready()
4865    }
4866
4867    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4868        self.operator.register_ready_callback(cb);
4869    }
4870
4871    fn eval<'a>(
4872        &'a mut self,
4873    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4874        Box::pin(async {
4875            self.output_stream.put(self.operator.eval().await);
4876            Ok(self.operator.flush_progress())
4877        })
4878    }
4879
4880    fn start_transaction(&mut self) {
4881        self.operator.start_transaction();
4882    }
4883
4884    fn flush(&mut self) {
4885        self.operator.flush();
4886    }
4887
4888    fn is_flush_complete(&self) -> bool {
4889        self.operator.is_flush_complete()
4890    }
4891
4892    fn clock_start(&mut self, scope: Scope) {
4893        self.operator.clock_start(scope);
4894    }
4895
4896    fn clock_end(&mut self, scope: Scope) {
4897        self.operator.clock_end(scope);
4898    }
4899
4900    fn init(&mut self) {
4901        self.operator.init(&self.id);
4902    }
4903
4904    fn metadata(&self, output: &mut OperatorMeta) {
4905        self.operator.metadata(output);
4906    }
4907
4908    fn fixedpoint(&self, scope: Scope) -> bool {
4909        self.operator.fixedpoint(scope)
4910    }
4911
4912    fn checkpoint(
4913        &mut self,
4914        base: &StoragePath,
4915        files: &mut Vec<Arc<dyn FileCommitter>>,
4916    ) -> Result<(), DbspError> {
4917        self.operator
4918            .checkpoint(base, self.persistent_id().as_deref(), files)
4919    }
4920
4921    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4922        self.operator.restore(base, self.persistent_id().as_deref())
4923    }
4924
4925    fn start_compaction(&mut self) {
4926        self.operator.start_compaction()
4927    }
4928
4929    fn clear_state(&mut self) -> Result<(), DbspError> {
4930        self.operator.clear_state()
4931    }
4932
4933    fn start_replay(&mut self) -> Result<(), DbspError> {
4934        self.operator.start_replay()
4935    }
4936
4937    fn is_replay_complete(&self) -> bool {
4938        self.operator.is_replay_complete()
4939    }
4940
4941    fn end_replay(&mut self) -> Result<(), DbspError> {
4942        self.operator.end_replay()
4943    }
4944
4945    fn set_label(&mut self, key: &str, value: &str) {
4946        self.labels.insert(key.to_string(), value.to_string());
4947    }
4948
4949    fn get_label(&self, key: &str) -> Option<&str> {
4950        self.labels.get(key).map(|s| s.as_str())
4951    }
4952
4953    fn labels(&self) -> &BTreeMap<String, String> {
4954        &self.labels
4955    }
4956
4957    fn as_any(&self) -> &dyn Any {
4958        self
4959    }
4960}
4961
4962struct UnaryNode<C, I, O, Op> {
4963    id: GlobalNodeId,
4964    operator: Op,
4965    input_stream: Stream<C, I>,
4966    output_stream: Stream<C, O>,
4967    labels: BTreeMap<String, String>,
4968}
4969
4970impl<C, I, O, Op> UnaryNode<C, I, O, Op>
4971where
4972    Op: UnaryOperator<I, O>,
4973    C: Circuit,
4974{
4975    fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4976        Self {
4977            id: circuit.global_node_id().child(id),
4978            operator,
4979            input_stream,
4980            output_stream: Stream::new(circuit, id),
4981            labels: BTreeMap::new(),
4982        }
4983    }
4984
4985    fn output_stream(&self) -> Stream<C, O> {
4986        self.output_stream.clone()
4987    }
4988}
4989
4990impl<C, I, O, Op> Node for UnaryNode<C, I, O, Op>
4991where
4992    C: Circuit,
4993    I: Clone + 'static,
4994    O: Clone + 'static,
4995    Op: UnaryOperator<I, O>,
4996{
4997    fn name(&self) -> Cow<'static, str> {
4998        self.operator.name()
4999    }
5000
5001    fn local_id(&self) -> NodeId {
5002        self.id.local_node_id().unwrap()
5003    }
5004
5005    fn global_id(&self) -> &GlobalNodeId {
5006        &self.id
5007    }
5008
5009    fn is_async(&self) -> bool {
5010        self.operator.is_async()
5011    }
5012
5013    fn is_input(&self) -> bool {
5014        self.operator.is_input()
5015    }
5016
5017    fn ready(&self) -> bool {
5018        self.operator.ready()
5019    }
5020
5021    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5022        self.operator.register_ready_callback(cb);
5023    }
5024
5025    // Justification: see StreamValue::take() comment.
5026    #[allow(clippy::await_holding_refcell_ref)]
5027    fn eval<'a>(
5028        &'a mut self,
5029    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5030        Box::pin(async {
5031            self.output_stream
5032                .put(match StreamValue::take(self.input_stream.val()) {
5033                    Some(v) => self.operator.eval_owned(v).await,
5034                    None => {
5035                        self.operator
5036                            .eval(StreamValue::peek(&self.input_stream.get()))
5037                            .await
5038                    }
5039                });
5040            StreamValue::consume_token(self.input_stream.val());
5041            Ok(self.operator.flush_progress())
5042        })
5043    }
5044
5045    fn start_transaction(&mut self) {
5046        self.operator.start_transaction();
5047    }
5048
5049    fn flush(&mut self) {
5050        self.operator.flush();
5051    }
5052
5053    fn is_flush_complete(&self) -> bool {
5054        self.operator.is_flush_complete()
5055    }
5056
5057    fn clock_start(&mut self, scope: Scope) {
5058        self.operator.clock_start(scope);
5059    }
5060
5061    fn clock_end(&mut self, scope: Scope) {
5062        self.operator.clock_end(scope);
5063    }
5064
5065    fn init(&mut self) {
5066        self.operator.init(&self.id);
5067    }
5068
5069    fn metadata(&self, output: &mut OperatorMeta) {
5070        self.operator.metadata(output);
5071    }
5072
5073    fn fixedpoint(&self, scope: Scope) -> bool {
5074        self.operator.fixedpoint(scope)
5075    }
5076
5077    fn checkpoint(
5078        &mut self,
5079        base: &StoragePath,
5080        files: &mut Vec<Arc<dyn FileCommitter>>,
5081    ) -> Result<(), DbspError> {
5082        self.operator
5083            .checkpoint(base, self.persistent_id().as_deref(), files)
5084    }
5085
5086    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5087        self.operator.restore(base, self.persistent_id().as_deref())
5088    }
5089
5090    fn start_compaction(&mut self) {
5091        self.operator.start_compaction()
5092    }
5093
5094    fn clear_state(&mut self) -> Result<(), DbspError> {
5095        self.operator.clear_state()
5096    }
5097
5098    fn start_replay(&mut self) -> Result<(), DbspError> {
5099        self.operator.start_replay()
5100    }
5101
5102    fn is_replay_complete(&self) -> bool {
5103        self.operator.is_replay_complete()
5104    }
5105
5106    fn end_replay(&mut self) -> Result<(), DbspError> {
5107        self.operator.end_replay()
5108    }
5109
5110    fn set_label(&mut self, key: &str, value: &str) {
5111        self.labels.insert(key.to_string(), value.to_string());
5112    }
5113
5114    fn get_label(&self, key: &str) -> Option<&str> {
5115        self.labels.get(key).map(|s| s.as_str())
5116    }
5117
5118    fn labels(&self) -> &BTreeMap<String, String> {
5119        &self.labels
5120    }
5121
5122    fn as_any(&self) -> &dyn Any {
5123        self
5124    }
5125}
5126
5127struct SinkNode<C, I, Op> {
5128    id: GlobalNodeId,
5129    operator: Op,
5130    input_stream: Stream<C, I>,
5131    labels: BTreeMap<String, String>,
5132}
5133
5134impl<C, I, Op> SinkNode<C, I, Op>
5135where
5136    Op: SinkOperator<I>,
5137    C: Circuit,
5138{
5139    fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
5140        Self {
5141            id: circuit.global_node_id().child(id),
5142            operator,
5143            input_stream,
5144            labels: BTreeMap::new(),
5145        }
5146    }
5147}
5148
5149impl<C, I, Op> Node for SinkNode<C, I, Op>
5150where
5151    C: Circuit,
5152    I: Clone + 'static,
5153    Op: SinkOperator<I>,
5154{
5155    fn name(&self) -> Cow<'static, str> {
5156        self.operator.name()
5157    }
5158
5159    fn local_id(&self) -> NodeId {
5160        self.id.local_node_id().unwrap()
5161    }
5162
5163    fn global_id(&self) -> &GlobalNodeId {
5164        &self.id
5165    }
5166
5167    fn is_async(&self) -> bool {
5168        self.operator.is_async()
5169    }
5170
5171    fn is_input(&self) -> bool {
5172        self.operator.is_input()
5173    }
5174
5175    fn ready(&self) -> bool {
5176        self.operator.ready()
5177    }
5178
5179    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5180        self.operator.register_ready_callback(cb);
5181    }
5182
5183    // Justification: see StreamValue::take() comment.
5184    #[allow(clippy::await_holding_refcell_ref)]
5185    fn eval<'a>(
5186        &'a mut self,
5187    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5188        Box::pin(async {
5189            match StreamValue::take(self.input_stream.val()) {
5190                Some(v) => self.operator.eval_owned(v).await,
5191                None => {
5192                    self.operator
5193                        .eval(StreamValue::peek(&self.input_stream.get()))
5194                        .await
5195                }
5196            };
5197            StreamValue::consume_token(self.input_stream.val());
5198
5199            Ok(self.operator.flush_progress())
5200        })
5201    }
5202
5203    fn start_transaction(&mut self) {
5204        self.operator.start_transaction();
5205    }
5206
5207    fn flush(&mut self) {
5208        self.operator.flush();
5209    }
5210
5211    fn is_flush_complete(&self) -> bool {
5212        self.operator.is_flush_complete()
5213    }
5214
5215    fn clock_start(&mut self, scope: Scope) {
5216        self.operator.clock_start(scope);
5217    }
5218
5219    fn clock_end(&mut self, scope: Scope) {
5220        self.operator.clock_end(scope);
5221    }
5222
5223    fn init(&mut self) {
5224        self.operator.init(&self.id);
5225    }
5226
5227    fn metadata(&self, output: &mut OperatorMeta) {
5228        self.operator.metadata(output);
5229    }
5230
5231    fn fixedpoint(&self, scope: Scope) -> bool {
5232        self.operator.fixedpoint(scope)
5233    }
5234
5235    fn checkpoint(
5236        &mut self,
5237        base: &StoragePath,
5238        files: &mut Vec<Arc<dyn FileCommitter>>,
5239    ) -> Result<(), DbspError> {
5240        self.operator
5241            .checkpoint(base, self.persistent_id().as_deref(), files)
5242    }
5243
5244    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5245        self.operator.restore(base, self.persistent_id().as_deref())
5246    }
5247
5248    fn start_compaction(&mut self) {
5249        self.operator.start_compaction()
5250    }
5251
5252    fn clear_state(&mut self) -> Result<(), DbspError> {
5253        self.operator.clear_state()
5254    }
5255
5256    fn start_replay(&mut self) -> Result<(), DbspError> {
5257        self.operator.start_replay()
5258    }
5259
5260    fn is_replay_complete(&self) -> bool {
5261        self.operator.is_replay_complete()
5262    }
5263
5264    fn end_replay(&mut self) -> Result<(), DbspError> {
5265        self.operator.end_replay()
5266    }
5267
5268    fn set_label(&mut self, key: &str, value: &str) {
5269        self.labels.insert(key.to_string(), value.to_string());
5270    }
5271
5272    fn get_label(&self, key: &str) -> Option<&str> {
5273        self.labels.get(key).map(|s| s.as_str())
5274    }
5275
5276    fn labels(&self) -> &BTreeMap<String, String> {
5277        &self.labels
5278    }
5279
5280    fn as_any(&self) -> &dyn Any {
5281        self
5282    }
5283}
5284
5285struct BinarySinkNode<C, I1, I2, Op> {
5286    id: GlobalNodeId,
5287    operator: Op,
5288    input_stream1: Stream<C, I1>,
5289    input_stream2: Stream<C, I2>,
5290    // `true` if both input streams are aliases of the same stream.
5291    is_alias: bool,
5292    labels: BTreeMap<String, String>,
5293}
5294
5295impl<C, I1, I2, Op> BinarySinkNode<C, I1, I2, Op>
5296where
5297    I1: Clone,
5298    I2: Clone,
5299    Op: BinarySinkOperator<I1, I2>,
5300    C: Circuit,
5301{
5302    fn new(
5303        operator: Op,
5304        input_stream1: Stream<C, I1>,
5305        input_stream2: Stream<C, I2>,
5306        circuit: C,
5307        id: NodeId,
5308    ) -> Self {
5309        let is_alias = input_stream1.ptr_eq(&input_stream2);
5310        Self {
5311            id: circuit.global_node_id().child(id),
5312            operator,
5313            input_stream1,
5314            input_stream2,
5315            is_alias,
5316            labels: BTreeMap::new(),
5317        }
5318    }
5319}
5320
5321impl<C, I1, I2, Op> Node for BinarySinkNode<C, I1, I2, Op>
5322where
5323    C: Circuit,
5324    I1: Clone + 'static,
5325    I2: Clone + 'static,
5326    Op: BinarySinkOperator<I1, I2>,
5327{
5328    fn name(&self) -> Cow<'static, str> {
5329        self.operator.name()
5330    }
5331
5332    fn local_id(&self) -> NodeId {
5333        self.id.local_node_id().unwrap()
5334    }
5335
5336    fn global_id(&self) -> &GlobalNodeId {
5337        &self.id
5338    }
5339
5340    fn is_async(&self) -> bool {
5341        self.operator.is_async()
5342    }
5343
5344    fn is_input(&self) -> bool {
5345        self.operator.is_input()
5346    }
5347
5348    fn ready(&self) -> bool {
5349        self.operator.ready()
5350    }
5351
5352    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5353        self.operator.register_ready_callback(cb);
5354    }
5355
5356    // Justification: see StreamValue::take() comment.
5357    #[allow(clippy::await_holding_refcell_ref)]
5358    fn eval<'a>(
5359        &'a mut self,
5360    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5361        Box::pin(async {
5362            if self.is_alias {
5363                {
5364                    let val1 = self.input_stream1.get();
5365                    let val2 = self.input_stream2.get();
5366                    self.operator
5367                        .eval(
5368                            Cow::Borrowed(StreamValue::peek(&val1)),
5369                            Cow::Borrowed(StreamValue::peek(&val2)),
5370                        )
5371                        .await;
5372                }
5373
5374                StreamValue::consume_token(self.input_stream1.val());
5375                StreamValue::consume_token(self.input_stream2.val());
5376            } else {
5377                let val1 = StreamValue::take(self.input_stream1.val());
5378                let val2 = StreamValue::take(self.input_stream2.val());
5379
5380                match (val1, val2) {
5381                    (Some(val1), Some(val2)) => {
5382                        self.operator.eval(Cow::Owned(val1), Cow::Owned(val2)).await;
5383                    }
5384                    (Some(val1), None) => {
5385                        self.operator
5386                            .eval(
5387                                Cow::Owned(val1),
5388                                Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5389                            )
5390                            .await;
5391                    }
5392                    (None, Some(val2)) => {
5393                        self.operator
5394                            .eval(
5395                                Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5396                                Cow::Owned(val2),
5397                            )
5398                            .await;
5399                    }
5400                    (None, None) => {
5401                        self.operator
5402                            .eval(
5403                                Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5404                                Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5405                            )
5406                            .await;
5407                    }
5408                }
5409
5410                StreamValue::consume_token(self.input_stream1.val());
5411                StreamValue::consume_token(self.input_stream2.val());
5412            };
5413
5414            Ok(self.operator.flush_progress())
5415        })
5416    }
5417
5418    fn start_transaction(&mut self) {
5419        self.operator.start_transaction();
5420    }
5421
5422    fn flush(&mut self) {
5423        self.operator.flush();
5424    }
5425
5426    fn is_flush_complete(&self) -> bool {
5427        self.operator.is_flush_complete()
5428    }
5429
5430    fn clock_start(&mut self, scope: Scope) {
5431        self.operator.clock_start(scope);
5432    }
5433
5434    fn clock_end(&mut self, scope: Scope) {
5435        self.operator.clock_end(scope);
5436    }
5437
5438    fn init(&mut self) {
5439        self.operator.init(&self.id);
5440    }
5441
5442    fn metadata(&self, output: &mut OperatorMeta) {
5443        self.operator.metadata(output);
5444    }
5445
5446    fn fixedpoint(&self, scope: Scope) -> bool {
5447        self.operator.fixedpoint(scope)
5448    }
5449
5450    fn checkpoint(
5451        &mut self,
5452        base: &StoragePath,
5453        files: &mut Vec<Arc<dyn FileCommitter>>,
5454    ) -> Result<(), DbspError> {
5455        self.operator
5456            .checkpoint(base, self.persistent_id().as_deref(), files)
5457    }
5458
5459    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5460        self.operator.restore(base, self.persistent_id().as_deref())
5461    }
5462
5463    fn start_compaction(&mut self) {
5464        self.operator.start_compaction()
5465    }
5466
5467    fn clear_state(&mut self) -> Result<(), DbspError> {
5468        self.operator.clear_state()
5469    }
5470
5471    fn start_replay(&mut self) -> Result<(), DbspError> {
5472        self.operator.start_replay()
5473    }
5474
5475    fn is_replay_complete(&self) -> bool {
5476        self.operator.is_replay_complete()
5477    }
5478
5479    fn end_replay(&mut self) -> Result<(), DbspError> {
5480        self.operator.end_replay()
5481    }
5482
5483    fn set_label(&mut self, key: &str, value: &str) {
5484        self.labels.insert(key.to_string(), value.to_string());
5485    }
5486
5487    fn get_label(&self, key: &str) -> Option<&str> {
5488        self.labels.get(key).map(|s| s.as_str())
5489    }
5490
5491    fn labels(&self) -> &BTreeMap<String, String> {
5492        &self.labels
5493    }
5494
5495    fn as_any(&self) -> &dyn Any {
5496        self
5497    }
5498}
5499
5500struct TernarySinkNode<C, I1, I2, I3, Op> {
5501    id: GlobalNodeId,
5502    operator: Op,
5503    input_stream1: Stream<C, I1>,
5504    input_stream2: Stream<C, I2>,
5505    input_stream3: Stream<C, I3>,
5506    labels: BTreeMap<String, String>,
5507}
5508
5509impl<C, I1, I2, I3, Op> TernarySinkNode<C, I1, I2, I3, Op>
5510where
5511    I1: Clone,
5512    I2: Clone,
5513    I3: Clone,
5514    Op: TernarySinkOperator<I1, I2, I3>,
5515    C: Circuit,
5516{
5517    fn new(
5518        operator: Op,
5519        input_stream1: Stream<C, I1>,
5520        input_stream2: Stream<C, I2>,
5521        input_stream3: Stream<C, I3>,
5522        circuit: C,
5523        id: NodeId,
5524    ) -> Self {
5525        assert!(!input_stream1.ptr_eq(&input_stream2));
5526        assert!(!input_stream1.ptr_eq(&input_stream3));
5527        assert!(!input_stream2.ptr_eq(&input_stream3));
5528
5529        Self {
5530            id: circuit.global_node_id().child(id),
5531            operator,
5532            input_stream1,
5533            input_stream2,
5534            input_stream3,
5535            labels: BTreeMap::new(),
5536        }
5537    }
5538}
5539
5540impl<C, I1, I2, I3, Op> Node for TernarySinkNode<C, I1, I2, I3, Op>
5541where
5542    C: Circuit,
5543    I1: Clone + 'static,
5544    I2: Clone + 'static,
5545    I3: Clone + 'static,
5546    Op: TernarySinkOperator<I1, I2, I3>,
5547{
5548    fn name(&self) -> Cow<'static, str> {
5549        self.operator.name()
5550    }
5551
5552    fn local_id(&self) -> NodeId {
5553        self.id.local_node_id().unwrap()
5554    }
5555
5556    fn global_id(&self) -> &GlobalNodeId {
5557        &self.id
5558    }
5559
5560    fn is_async(&self) -> bool {
5561        self.operator.is_async()
5562    }
5563
5564    fn is_input(&self) -> bool {
5565        self.operator.is_input()
5566    }
5567
5568    fn ready(&self) -> bool {
5569        self.operator.ready()
5570    }
5571
5572    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5573        self.operator.register_ready_callback(cb);
5574    }
5575
5576    // Justification: see StreamValue::take() comment.
5577    #[allow(clippy::await_holding_refcell_ref)]
5578    fn eval<'a>(
5579        &'a mut self,
5580    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5581        Box::pin(async {
5582            let val1 = StreamValue::take(self.input_stream1.val()).map(|val| Cow::Owned(val));
5583            let r1 = self.input_stream1.get();
5584            let val2 = StreamValue::take(self.input_stream2.val()).map(|val| Cow::Owned(val));
5585            let r2 = self.input_stream2.get();
5586            let val3 = StreamValue::take(self.input_stream3.val()).map(|val| Cow::Owned(val));
5587            let r3 = self.input_stream3.get();
5588
5589            self.operator
5590                .eval(
5591                    val1.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r1))),
5592                    val2.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r2))),
5593                    val3.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r3))),
5594                )
5595                .await;
5596
5597            drop(r1);
5598            drop(r2);
5599            drop(r3);
5600
5601            StreamValue::consume_token(self.input_stream1.val());
5602            StreamValue::consume_token(self.input_stream2.val());
5603            StreamValue::consume_token(self.input_stream3.val());
5604
5605            Ok(self.operator.flush_progress())
5606        })
5607    }
5608
5609    fn start_transaction(&mut self) {
5610        self.operator.start_transaction();
5611    }
5612
5613    fn flush(&mut self) {
5614        self.operator.flush();
5615    }
5616
5617    fn is_flush_complete(&self) -> bool {
5618        self.operator.is_flush_complete()
5619    }
5620
5621    fn clock_start(&mut self, scope: Scope) {
5622        self.operator.clock_start(scope);
5623    }
5624
5625    fn clock_end(&mut self, scope: Scope) {
5626        self.operator.clock_end(scope);
5627    }
5628
5629    fn init(&mut self) {
5630        self.operator.init(&self.id);
5631    }
5632
5633    fn metadata(&self, output: &mut OperatorMeta) {
5634        self.operator.metadata(output);
5635    }
5636
5637    fn fixedpoint(&self, scope: Scope) -> bool {
5638        self.operator.fixedpoint(scope)
5639    }
5640
5641    fn checkpoint(
5642        &mut self,
5643        base: &StoragePath,
5644        files: &mut Vec<Arc<dyn FileCommitter>>,
5645    ) -> Result<(), DbspError> {
5646        self.operator
5647            .checkpoint(base, self.persistent_id().as_deref(), files)
5648    }
5649
5650    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5651        self.operator.restore(base, self.persistent_id().as_deref())
5652    }
5653
5654    fn start_compaction(&mut self) {
5655        self.operator.start_compaction()
5656    }
5657
5658    fn clear_state(&mut self) -> Result<(), DbspError> {
5659        self.operator.clear_state()
5660    }
5661
5662    fn start_replay(&mut self) -> Result<(), DbspError> {
5663        self.operator.start_replay()
5664    }
5665
5666    fn is_replay_complete(&self) -> bool {
5667        self.operator.is_replay_complete()
5668    }
5669
5670    fn end_replay(&mut self) -> Result<(), DbspError> {
5671        self.operator.end_replay()
5672    }
5673
5674    fn set_label(&mut self, key: &str, value: &str) {
5675        self.labels.insert(key.to_string(), value.to_string());
5676    }
5677
5678    fn get_label(&self, key: &str) -> Option<&str> {
5679        self.labels.get(key).map(|s| s.as_str())
5680    }
5681
5682    fn labels(&self) -> &BTreeMap<String, String> {
5683        &self.labels
5684    }
5685
5686    fn as_any(&self) -> &dyn Any {
5687        self
5688    }
5689}
5690
5691struct BinaryNode<C, I1, I2, O, Op> {
5692    id: GlobalNodeId,
5693    operator: Op,
5694    input_stream1: Stream<C, I1>,
5695    input_stream2: Stream<C, I2>,
5696    output_stream: Stream<C, O>,
5697    // `true` if both input streams are aliases of the same stream.
5698    is_alias: bool,
5699    labels: BTreeMap<String, String>,
5700}
5701
5702impl<C, I1, I2, O, Op> BinaryNode<C, I1, I2, O, Op>
5703where
5704    Op: BinaryOperator<I1, I2, O>,
5705    C: Circuit,
5706{
5707    fn new(
5708        operator: Op,
5709        input_stream1: Stream<C, I1>,
5710        input_stream2: Stream<C, I2>,
5711        circuit: C,
5712        id: NodeId,
5713    ) -> Self {
5714        let is_alias = input_stream1.ptr_eq(&input_stream2);
5715        Self {
5716            id: circuit.global_node_id().child(id),
5717            operator,
5718            input_stream1,
5719            input_stream2,
5720            is_alias,
5721            output_stream: Stream::new(circuit, id),
5722            labels: BTreeMap::new(),
5723        }
5724    }
5725
5726    fn output_stream(&self) -> Stream<C, O> {
5727        self.output_stream.clone()
5728    }
5729}
5730
5731impl<C, I1, I2, O, Op> Node for BinaryNode<C, I1, I2, O, Op>
5732where
5733    C: Circuit,
5734    I1: Clone + 'static,
5735    I2: Clone + 'static,
5736    O: Clone + 'static,
5737    Op: BinaryOperator<I1, I2, O>,
5738{
5739    fn name(&self) -> Cow<'static, str> {
5740        self.operator.name()
5741    }
5742
5743    fn local_id(&self) -> NodeId {
5744        self.id.local_node_id().unwrap()
5745    }
5746
5747    fn global_id(&self) -> &GlobalNodeId {
5748        &self.id
5749    }
5750
5751    fn is_async(&self) -> bool {
5752        self.operator.is_async()
5753    }
5754
5755    fn is_input(&self) -> bool {
5756        self.operator.is_input()
5757    }
5758
5759    fn ready(&self) -> bool {
5760        self.operator.ready()
5761    }
5762
5763    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5764        self.operator.register_ready_callback(cb);
5765    }
5766
5767    // Justification: see StreamValue::take() comment.
5768    #[allow(clippy::await_holding_refcell_ref)]
5769    fn eval<'a>(
5770        &'a mut self,
5771    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5772        Box::pin(async {
5773            // If the two input streams are aliases, we cannot remove the owned
5774            // value from `input_stream2`, as this will invalidate the borrow
5775            // from `input_stream1`.  Instead use `peek` to obtain the value by
5776            // reference.
5777            if self.is_alias {
5778                {
5779                    let val1 = self.input_stream1.get();
5780                    let val2 = self.input_stream2.get();
5781
5782                    self.output_stream.put(
5783                        self.operator
5784                            .eval(StreamValue::peek(&val1), StreamValue::peek(&val2))
5785                            .await,
5786                    );
5787                }
5788                // It is now safe to call `take`, and we must do so to decrement
5789                // the ref counter.
5790                StreamValue::consume_token(self.input_stream1.val());
5791                StreamValue::consume_token(self.input_stream2.val());
5792            } else {
5793                let val1 = StreamValue::take(self.input_stream1.val());
5794                let val2 = StreamValue::take(self.input_stream2.val());
5795
5796                self.output_stream.put(match (val1, val2) {
5797                    (Some(val1), Some(val2)) => self.operator.eval_owned(val1, val2).await,
5798                    (Some(val1), None) => {
5799                        self.operator
5800                            .eval_owned_and_ref(val1, StreamValue::peek(&self.input_stream2.get()))
5801                            .await
5802                    }
5803                    (None, Some(val2)) => {
5804                        self.operator
5805                            .eval_ref_and_owned(StreamValue::peek(&self.input_stream1.get()), val2)
5806                            .await
5807                    }
5808                    (None, None) => {
5809                        self.operator
5810                            .eval(
5811                                StreamValue::peek(&self.input_stream1.get()),
5812                                StreamValue::peek(&self.input_stream2.get()),
5813                            )
5814                            .await
5815                    }
5816                });
5817                StreamValue::consume_token(self.input_stream1.val());
5818                StreamValue::consume_token(self.input_stream2.val());
5819            }
5820            Ok(self.operator.flush_progress())
5821        })
5822    }
5823
5824    fn start_transaction(&mut self) {
5825        self.operator.start_transaction();
5826    }
5827
5828    fn flush(&mut self) {
5829        self.operator.flush();
5830    }
5831
5832    fn is_flush_complete(&self) -> bool {
5833        self.operator.is_flush_complete()
5834    }
5835
5836    fn clock_start(&mut self, scope: Scope) {
5837        self.operator.clock_start(scope);
5838    }
5839
5840    fn clock_end(&mut self, scope: Scope) {
5841        self.operator.clock_end(scope);
5842    }
5843
5844    fn init(&mut self) {
5845        self.operator.init(&self.id);
5846    }
5847
5848    fn metadata(&self, output: &mut OperatorMeta) {
5849        self.operator.metadata(output);
5850    }
5851
5852    fn fixedpoint(&self, scope: Scope) -> bool {
5853        self.operator.fixedpoint(scope)
5854    }
5855
5856    fn checkpoint(
5857        &mut self,
5858        base: &StoragePath,
5859        files: &mut Vec<Arc<dyn FileCommitter>>,
5860    ) -> Result<(), DbspError> {
5861        self.operator
5862            .checkpoint(base, self.persistent_id().as_deref(), files)
5863    }
5864
5865    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5866        self.operator.restore(base, self.persistent_id().as_deref())
5867    }
5868
5869    fn start_compaction(&mut self) {
5870        self.operator.start_compaction()
5871    }
5872
5873    fn clear_state(&mut self) -> Result<(), DbspError> {
5874        self.operator.clear_state()
5875    }
5876
5877    fn start_replay(&mut self) -> Result<(), DbspError> {
5878        self.operator.start_replay()
5879    }
5880
5881    fn is_replay_complete(&self) -> bool {
5882        self.operator.is_replay_complete()
5883    }
5884
5885    fn end_replay(&mut self) -> Result<(), DbspError> {
5886        self.operator.end_replay()
5887    }
5888
5889    fn set_label(&mut self, key: &str, value: &str) {
5890        self.labels.insert(key.to_string(), value.to_string());
5891    }
5892
5893    fn get_label(&self, key: &str) -> Option<&str> {
5894        self.labels.get(key).map(|s| s.as_str())
5895    }
5896
5897    fn labels(&self) -> &BTreeMap<String, String> {
5898        &self.labels
5899    }
5900
5901    fn as_any(&self) -> &dyn Any {
5902        self
5903    }
5904}
5905
5906struct TernaryNode<C, I1, I2, I3, O, Op> {
5907    id: GlobalNodeId,
5908    operator: Op,
5909    input_stream1: Stream<C, I1>,
5910    input_stream2: Stream<C, I2>,
5911    input_stream3: Stream<C, I3>,
5912    output_stream: Stream<C, O>,
5913    labels: BTreeMap<String, String>,
5914}
5915
5916impl<C, I1, I2, I3, O, Op> TernaryNode<C, I1, I2, I3, O, Op>
5917where
5918    I1: Clone,
5919    I2: Clone,
5920    I3: Clone,
5921    Op: TernaryOperator<I1, I2, I3, O>,
5922    C: Circuit,
5923{
5924    fn new(
5925        operator: Op,
5926        input_stream1: Stream<C, I1>,
5927        input_stream2: Stream<C, I2>,
5928        input_stream3: Stream<C, I3>,
5929        circuit: C,
5930        id: NodeId,
5931    ) -> Self {
5932        Self {
5933            id: circuit.global_node_id().child(id),
5934            operator,
5935            input_stream1,
5936            input_stream2,
5937            input_stream3,
5938            // is_alias1,
5939            // is_alias2,
5940            output_stream: Stream::new(circuit, id),
5941            labels: BTreeMap::new(),
5942        }
5943    }
5944
5945    fn output_stream(&self) -> Stream<C, O> {
5946        self.output_stream.clone()
5947    }
5948}
5949
5950impl<C, I1, I2, I3, O, Op> Node for TernaryNode<C, I1, I2, I3, O, Op>
5951where
5952    C: Circuit,
5953    I1: Clone + 'static,
5954    I2: Clone + 'static,
5955    I3: Clone + 'static,
5956    O: Clone + 'static,
5957    Op: TernaryOperator<I1, I2, I3, O>,
5958{
5959    fn name(&self) -> Cow<'static, str> {
5960        self.operator.name()
5961    }
5962
5963    fn local_id(&self) -> NodeId {
5964        self.id.local_node_id().unwrap()
5965    }
5966
5967    fn global_id(&self) -> &GlobalNodeId {
5968        &self.id
5969    }
5970
5971    fn is_async(&self) -> bool {
5972        self.operator.is_async()
5973    }
5974
5975    fn is_input(&self) -> bool {
5976        self.operator.is_input()
5977    }
5978
5979    fn ready(&self) -> bool {
5980        self.operator.ready()
5981    }
5982
5983    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5984        self.operator.register_ready_callback(cb);
5985    }
5986
5987    // Justification: see StreamValue::take() comment.
5988    #[allow(clippy::await_holding_refcell_ref)]
5989    fn eval<'a>(
5990        &'a mut self,
5991    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5992        Box::pin(async {
5993            {
5994                self.output_stream.put(
5995                    self.operator
5996                        .eval(
5997                            Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5998                            Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5999                            Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
6000                        )
6001                        .await,
6002                );
6003            }
6004
6005            StreamValue::consume_token(self.input_stream1.val());
6006            StreamValue::consume_token(self.input_stream2.val());
6007            StreamValue::consume_token(self.input_stream3.val());
6008
6009            Ok(self.operator.flush_progress())
6010        })
6011    }
6012
6013    fn start_transaction(&mut self) {
6014        self.operator.start_transaction();
6015    }
6016
6017    fn flush(&mut self) {
6018        self.operator.flush();
6019    }
6020
6021    fn is_flush_complete(&self) -> bool {
6022        self.operator.is_flush_complete()
6023    }
6024
6025    fn clock_start(&mut self, scope: Scope) {
6026        self.operator.clock_start(scope);
6027    }
6028
6029    fn clock_end(&mut self, scope: Scope) {
6030        self.operator.clock_end(scope);
6031    }
6032
6033    fn init(&mut self) {
6034        self.operator.init(&self.id);
6035    }
6036
6037    fn metadata(&self, output: &mut OperatorMeta) {
6038        self.operator.metadata(output);
6039    }
6040
6041    fn fixedpoint(&self, scope: Scope) -> bool {
6042        self.operator.fixedpoint(scope)
6043    }
6044
6045    fn checkpoint(
6046        &mut self,
6047        base: &StoragePath,
6048        files: &mut Vec<Arc<dyn FileCommitter>>,
6049    ) -> Result<(), DbspError> {
6050        self.operator
6051            .checkpoint(base, self.persistent_id().as_deref(), files)
6052    }
6053
6054    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6055        self.operator.restore(base, self.persistent_id().as_deref())
6056    }
6057
6058    fn start_compaction(&mut self) {
6059        self.operator.start_compaction()
6060    }
6061
6062    fn clear_state(&mut self) -> Result<(), DbspError> {
6063        self.operator.clear_state()
6064    }
6065
6066    fn start_replay(&mut self) -> Result<(), DbspError> {
6067        self.operator.start_replay()
6068    }
6069
6070    fn is_replay_complete(&self) -> bool {
6071        self.operator.is_replay_complete()
6072    }
6073
6074    fn end_replay(&mut self) -> Result<(), DbspError> {
6075        self.operator.end_replay()
6076    }
6077
6078    fn set_label(&mut self, key: &str, value: &str) {
6079        self.labels.insert(key.to_string(), value.to_string());
6080    }
6081
6082    fn get_label(&self, key: &str) -> Option<&str> {
6083        self.labels.get(key).map(|s| s.as_str())
6084    }
6085
6086    fn labels(&self) -> &BTreeMap<String, String> {
6087        &self.labels
6088    }
6089
6090    fn as_any(&self) -> &dyn Any {
6091        self
6092    }
6093}
6094
6095struct QuaternaryNode<C, I1, I2, I3, I4, O, Op> {
6096    id: GlobalNodeId,
6097    operator: Op,
6098    input_stream1: Stream<C, I1>,
6099    input_stream2: Stream<C, I2>,
6100    input_stream3: Stream<C, I3>,
6101    input_stream4: Stream<C, I4>,
6102    output_stream: Stream<C, O>,
6103    labels: BTreeMap<String, String>,
6104    // // `true` if `input_stream1` is an alias to `input_stream2`, `input_stream3` or
6105    // // `input_stream4`.
6106    // is_alias1: bool,
6107    // // `true` if `input_stream2` is an alias to `input_stream3` or `input_stream4`.
6108    // is_alias2: bool,
6109    // // `true` if `input_stream3` is an alias to `input_stream4`.
6110    // is_alias3: bool,
6111}
6112
6113impl<C, I1, I2, I3, I4, O, Op> QuaternaryNode<C, I1, I2, I3, I4, O, Op>
6114where
6115    I1: Clone,
6116    I2: Clone,
6117    I3: Clone,
6118    I4: Clone,
6119    Op: QuaternaryOperator<I1, I2, I3, I4, O>,
6120    C: Circuit,
6121{
6122    fn new(
6123        operator: Op,
6124        input_stream1: Stream<C, I1>,
6125        input_stream2: Stream<C, I2>,
6126        input_stream3: Stream<C, I3>,
6127        input_stream4: Stream<C, I4>,
6128        circuit: C,
6129        id: NodeId,
6130    ) -> Self {
6131        // let is_alias1 = input_stream1.ptr_eq(&input_stream2)
6132        //     || input_stream1.ptr_eq(&input_stream3)
6133        //     || input_stream1.ptr_eq(&input_stream4);
6134        // let is_alias2 =
6135        //     input_stream2.ptr_eq(&input_stream3) || input_stream2.ptr_eq(&input_stream4);
6136        // let is_alias3 = input_stream3.ptr_eq(&input_stream4);
6137        Self {
6138            id: circuit.global_node_id().child(id),
6139            operator,
6140            input_stream1,
6141            input_stream2,
6142            input_stream3,
6143            input_stream4,
6144            // is_alias1,
6145            // is_alias2,
6146            // is_alias3,
6147            output_stream: Stream::new(circuit, id),
6148            labels: BTreeMap::new(),
6149        }
6150    }
6151
6152    fn output_stream(&self) -> Stream<C, O> {
6153        self.output_stream.clone()
6154    }
6155}
6156
6157impl<C, I1, I2, I3, I4, O, Op> Node for QuaternaryNode<C, I1, I2, I3, I4, O, Op>
6158where
6159    C: Circuit,
6160    I1: Clone + 'static,
6161    I2: Clone + 'static,
6162    I3: Clone + 'static,
6163    I4: Clone + 'static,
6164    O: Clone + 'static,
6165    Op: QuaternaryOperator<I1, I2, I3, I4, O>,
6166{
6167    fn name(&self) -> Cow<'static, str> {
6168        self.operator.name()
6169    }
6170
6171    fn local_id(&self) -> NodeId {
6172        self.id.local_node_id().unwrap()
6173    }
6174
6175    fn global_id(&self) -> &GlobalNodeId {
6176        &self.id
6177    }
6178
6179    fn is_async(&self) -> bool {
6180        self.operator.is_async()
6181    }
6182
6183    fn is_input(&self) -> bool {
6184        self.operator.is_input()
6185    }
6186
6187    fn ready(&self) -> bool {
6188        self.operator.ready()
6189    }
6190
6191    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6192        self.operator.register_ready_callback(cb);
6193    }
6194
6195    // Justification: see StreamValue::take() comment.
6196    #[allow(clippy::await_holding_refcell_ref)]
6197    fn eval<'a>(
6198        &'a mut self,
6199    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6200        Box::pin(async {
6201            {
6202                self.output_stream.put(
6203                    self.operator
6204                        .eval(
6205                            Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
6206                            Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
6207                            Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
6208                            Cow::Borrowed(StreamValue::peek(&self.input_stream4.get())),
6209                        )
6210                        .await,
6211                );
6212            }
6213
6214            StreamValue::consume_token(self.input_stream1.val());
6215            StreamValue::consume_token(self.input_stream2.val());
6216            StreamValue::consume_token(self.input_stream3.val());
6217            StreamValue::consume_token(self.input_stream4.val());
6218
6219            Ok(self.operator.flush_progress())
6220        })
6221    }
6222
6223    fn start_transaction(&mut self) {
6224        self.operator.start_transaction();
6225    }
6226
6227    fn flush(&mut self) {
6228        self.operator.flush();
6229    }
6230
6231    fn is_flush_complete(&self) -> bool {
6232        self.operator.is_flush_complete()
6233    }
6234
6235    fn clock_start(&mut self, scope: Scope) {
6236        self.operator.clock_start(scope);
6237    }
6238
6239    fn clock_end(&mut self, scope: Scope) {
6240        self.operator.clock_end(scope);
6241    }
6242
6243    fn init(&mut self) {
6244        self.operator.init(&self.id);
6245    }
6246
6247    fn metadata(&self, output: &mut OperatorMeta) {
6248        self.operator.metadata(output);
6249    }
6250
6251    fn fixedpoint(&self, scope: Scope) -> bool {
6252        self.operator.fixedpoint(scope)
6253    }
6254
6255    fn checkpoint(
6256        &mut self,
6257        base: &StoragePath,
6258        files: &mut Vec<Arc<dyn FileCommitter>>,
6259    ) -> Result<(), DbspError> {
6260        self.operator
6261            .checkpoint(base, self.persistent_id().as_deref(), files)
6262    }
6263
6264    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6265        self.operator.restore(base, self.persistent_id().as_deref())
6266    }
6267
6268    fn start_compaction(&mut self) {
6269        self.operator.start_compaction()
6270    }
6271
6272    fn clear_state(&mut self) -> Result<(), DbspError> {
6273        self.operator.clear_state()
6274    }
6275
6276    fn start_replay(&mut self) -> Result<(), DbspError> {
6277        self.operator.start_replay()
6278    }
6279
6280    fn is_replay_complete(&self) -> bool {
6281        self.operator.is_replay_complete()
6282    }
6283
6284    fn end_replay(&mut self) -> Result<(), DbspError> {
6285        self.operator.end_replay()
6286    }
6287
6288    fn set_label(&mut self, key: &str, value: &str) {
6289        self.labels.insert(key.to_string(), value.to_string());
6290    }
6291
6292    fn get_label(&self, key: &str) -> Option<&str> {
6293        self.labels.get(key).map(|s| s.as_str())
6294    }
6295
6296    fn labels(&self) -> &BTreeMap<String, String> {
6297        &self.labels
6298    }
6299
6300    fn as_any(&self) -> &dyn Any {
6301        self
6302    }
6303}
6304
6305struct NaryNode<C, I, O, Op>
6306where
6307    I: Clone + 'static,
6308{
6309    id: GlobalNodeId,
6310    operator: Op,
6311    // The second field of the tuple indicates if the stream is an
6312    // alias to an earlier stream.
6313    input_streams: Vec<Stream<C, I>>,
6314    // // Streams that are aliases.
6315    // aliases: Vec<usize>,
6316    output_stream: Stream<C, O>,
6317    labels: BTreeMap<String, String>,
6318}
6319
6320impl<C, I, O, Op> NaryNode<C, I, O, Op>
6321where
6322    I: Clone + 'static,
6323    Op: NaryOperator<I, O>,
6324    C: Circuit,
6325{
6326    fn new<Iter>(operator: Op, input_streams: Iter, circuit: C, id: NodeId) -> Self
6327    where
6328        Iter: IntoIterator<Item = Stream<C, I>>,
6329    {
6330        let mut input_streams: Vec<_> = input_streams.into_iter().collect();
6331        // let mut aliases = Vec::new();
6332        // for i in 0..input_streams.len() {
6333        //     for j in 0..i {
6334        //         if input_streams[i].0.ptr_eq(&input_streams[j].0) {
6335        //             input_streams[i].1 = true;
6336        //             aliases.push(i);
6337        //             break;
6338        //         }
6339        //     }
6340        // }
6341        //aliases.shrink_to_fit();
6342        input_streams.shrink_to_fit();
6343        Self {
6344            id: circuit.global_node_id().child(id),
6345            operator,
6346            input_streams,
6347            //aliases,
6348            output_stream: Stream::new(circuit, id),
6349            labels: BTreeMap::new(),
6350        }
6351    }
6352
6353    fn output_stream(&self) -> Stream<C, O> {
6354        self.output_stream.clone()
6355    }
6356}
6357
6358impl<C, I, O, Op> Node for NaryNode<C, I, O, Op>
6359where
6360    C: Circuit,
6361    I: Clone,
6362    O: Clone + 'static,
6363    Op: NaryOperator<I, O>,
6364{
6365    fn name(&self) -> Cow<'static, str> {
6366        self.operator.name()
6367    }
6368
6369    fn local_id(&self) -> NodeId {
6370        self.id.local_node_id().unwrap()
6371    }
6372
6373    fn global_id(&self) -> &GlobalNodeId {
6374        &self.id
6375    }
6376
6377    fn is_async(&self) -> bool {
6378        self.operator.is_async()
6379    }
6380
6381    fn is_input(&self) -> bool {
6382        self.operator.is_input()
6383    }
6384
6385    fn ready(&self) -> bool {
6386        self.operator.ready()
6387    }
6388
6389    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6390        self.operator.register_ready_callback(cb);
6391    }
6392
6393    fn eval<'a>(
6394        &'a mut self,
6395    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6396        Box::pin(async {
6397            let refs = self
6398                .input_streams
6399                .iter()
6400                .map(|stream| stream.get())
6401                .collect::<Vec<_>>();
6402
6403            self.output_stream.put(
6404                self.operator
6405                    .eval(refs.iter().map(|r| Cow::Borrowed(StreamValue::peek(r))))
6406                    .await,
6407            );
6408
6409            std::mem::drop(refs);
6410
6411            for i in self.input_streams.iter() {
6412                StreamValue::consume_token(i.val());
6413            }
6414            Ok(self.operator.flush_progress())
6415        })
6416    }
6417
6418    fn start_transaction(&mut self) {
6419        self.operator.start_transaction();
6420    }
6421
6422    fn flush(&mut self) {
6423        self.operator.flush();
6424    }
6425
6426    fn is_flush_complete(&self) -> bool {
6427        self.operator.is_flush_complete()
6428    }
6429
6430    fn clock_start(&mut self, scope: Scope) {
6431        self.operator.clock_start(scope);
6432    }
6433
6434    fn clock_end(&mut self, scope: Scope) {
6435        self.operator.clock_end(scope);
6436    }
6437
6438    fn init(&mut self) {
6439        self.operator.init(&self.id);
6440    }
6441
6442    fn metadata(&self, output: &mut OperatorMeta) {
6443        self.operator.metadata(output);
6444    }
6445
6446    fn fixedpoint(&self, scope: Scope) -> bool {
6447        self.operator.fixedpoint(scope)
6448    }
6449
6450    fn checkpoint(
6451        &mut self,
6452        base: &StoragePath,
6453        files: &mut Vec<Arc<dyn FileCommitter>>,
6454    ) -> Result<(), DbspError> {
6455        self.operator
6456            .checkpoint(base, self.persistent_id().as_deref(), files)
6457    }
6458
6459    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6460        self.operator.restore(base, self.persistent_id().as_deref())
6461    }
6462
6463    fn start_compaction(&mut self) {
6464        self.operator.start_compaction()
6465    }
6466
6467    fn clear_state(&mut self) -> Result<(), DbspError> {
6468        self.operator.clear_state()
6469    }
6470
6471    fn start_replay(&mut self) -> Result<(), DbspError> {
6472        self.operator.start_replay()
6473    }
6474
6475    fn is_replay_complete(&self) -> bool {
6476        self.operator.is_replay_complete()
6477    }
6478
6479    fn end_replay(&mut self) -> Result<(), DbspError> {
6480        self.operator.end_replay()
6481    }
6482
6483    fn set_label(&mut self, key: &str, value: &str) {
6484        self.labels.insert(key.to_string(), value.to_string());
6485    }
6486
6487    fn get_label(&self, key: &str) -> Option<&str> {
6488        self.labels.get(key).map(|s| s.as_str())
6489    }
6490
6491    fn labels(&self) -> &BTreeMap<String, String> {
6492        &self.labels
6493    }
6494
6495    fn as_any(&self) -> &dyn Any {
6496        self
6497    }
6498}
6499
6500// The output half of a feedback node.  We implement a feedback node using a
6501// pair of nodes: `FeedbackOutputNode` is connected to the circuit as a source
6502// node (i.e., it does not have an input stream) and thus gets evaluated first
6503// in each time stamp.  `FeedbackInputNode` is a sink node.  This way the
6504// circuit graph remains acyclic and can be scheduled in a topological order.
6505struct FeedbackOutputNode<C, I, O, Op>
6506where
6507    C: Circuit,
6508{
6509    id: GlobalNodeId,
6510    operator: Rc<RefCell<Op>>,
6511    output_stream: Stream<C, O>,
6512    export_stream: Option<Stream<C::Parent, O>>,
6513    phantom_input: PhantomData<I>,
6514    labels: BTreeMap<String, String>,
6515}
6516
6517impl<C, I, O, Op> FeedbackOutputNode<C, I, O, Op>
6518where
6519    C: Circuit,
6520    Op: StrictUnaryOperator<I, O>,
6521{
6522    fn new(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6523        Self {
6524            id: circuit.global_node_id().child(id),
6525            operator,
6526            output_stream: Stream::new(circuit.clone(), id),
6527            export_stream: None,
6528            phantom_input: PhantomData,
6529            labels: BTreeMap::new(),
6530        }
6531    }
6532
6533    fn with_export(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6534        let mut result = Self::new(operator, circuit.clone(), id);
6535        result.export_stream = Some(Stream::with_origin(
6536            circuit.parent(),
6537            circuit.allocate_stream_id(),
6538            circuit.node_id(),
6539            GlobalNodeId::child_of(&circuit, id),
6540        ));
6541        result
6542    }
6543
6544    fn output_stream(&self) -> Stream<C, O> {
6545        self.output_stream.clone()
6546    }
6547}
6548
6549impl<C, I, O, Op> Node for FeedbackOutputNode<C, I, O, Op>
6550where
6551    C: Circuit,
6552    I: Data,
6553    O: Clone + 'static,
6554    Op: StrictUnaryOperator<I, O>,
6555{
6556    fn local_id(&self) -> NodeId {
6557        self.id.local_node_id().unwrap()
6558    }
6559
6560    fn global_id(&self) -> &GlobalNodeId {
6561        &self.id
6562    }
6563
6564    fn name(&self) -> Cow<'static, str> {
6565        self.operator.borrow().name()
6566    }
6567
6568    fn is_async(&self) -> bool {
6569        self.operator.borrow().is_async()
6570    }
6571
6572    fn is_input(&self) -> bool {
6573        self.operator.borrow().is_input()
6574    }
6575
6576    fn ready(&self) -> bool {
6577        self.operator.borrow().ready()
6578    }
6579
6580    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6581        self.operator.borrow_mut().register_ready_callback(cb);
6582    }
6583
6584    fn eval<'a>(
6585        &'a mut self,
6586    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6587        Box::pin(async {
6588            self.output_stream
6589                .put(self.operator.borrow_mut().get_output());
6590            Ok(None)
6591        })
6592    }
6593
6594    fn start_transaction(&mut self) {
6595        self.operator.borrow_mut().start_transaction();
6596    }
6597
6598    fn flush(&mut self) {
6599        self.operator.borrow_mut().flush();
6600    }
6601
6602    fn is_flush_complete(&self) -> bool {
6603        self.operator.borrow().is_flush_complete()
6604    }
6605
6606    fn clock_start(&mut self, scope: Scope) {
6607        self.operator.borrow_mut().clock_start(scope)
6608    }
6609
6610    fn clock_end(&mut self, scope: Scope) {
6611        if scope == 0
6612            && let Some(export_stream) = &mut self.export_stream
6613        {
6614            export_stream.put(self.operator.borrow_mut().get_final_output());
6615        }
6616        self.operator.borrow_mut().clock_end(scope);
6617    }
6618
6619    fn init(&mut self) {
6620        self.operator.borrow_mut().init(&self.id);
6621    }
6622
6623    fn metadata(&self, _output: &mut OperatorMeta) {
6624        // Avoid producing duplicate metadata for input and output parts of the operator;
6625        // otherwise it will be double-counted in circuit-level metrics.
6626    }
6627
6628    fn fixedpoint(&self, scope: Scope) -> bool {
6629        self.operator.borrow().fixedpoint(scope)
6630    }
6631
6632    fn checkpoint(
6633        &mut self,
6634        base: &StoragePath,
6635        files: &mut Vec<Arc<dyn FileCommitter>>,
6636    ) -> Result<(), DbspError> {
6637        self.operator
6638            .borrow_mut()
6639            .checkpoint(base, self.persistent_id().as_deref(), files)
6640    }
6641
6642    fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6643        self.operator
6644            .borrow_mut()
6645            .restore(base, self.persistent_id().as_deref())
6646    }
6647
6648    fn start_compaction(&mut self) {
6649        self.operator.borrow_mut().start_compaction()
6650    }
6651
6652    fn clear_state(&mut self) -> Result<(), DbspError> {
6653        self.operator.borrow_mut().clear_state()
6654    }
6655
6656    fn start_replay(&mut self) -> Result<(), DbspError> {
6657        self.operator.borrow_mut().start_replay()
6658    }
6659
6660    fn is_replay_complete(&self) -> bool {
6661        self.operator.borrow().is_replay_complete()
6662    }
6663
6664    fn end_replay(&mut self) -> Result<(), DbspError> {
6665        self.operator.borrow_mut().end_replay()
6666    }
6667
6668    fn set_label(&mut self, key: &str, value: &str) {
6669        self.labels.insert(key.to_string(), value.to_string());
6670    }
6671
6672    fn get_label(&self, key: &str) -> Option<&str> {
6673        self.labels.get(key).map(|s| s.as_str())
6674    }
6675
6676    fn labels(&self) -> &BTreeMap<String, String> {
6677        &self.labels
6678    }
6679
6680    fn as_any(&self) -> &dyn Any {
6681        self
6682    }
6683}
6684
6685/// The input half of a feedback node
6686struct FeedbackInputNode<C, I, O, Op> {
6687    // Id of this node (the input half).
6688    id: GlobalNodeId,
6689    operator: Rc<RefCell<Op>>,
6690    input_stream: Stream<C, I>,
6691    phantom_output: PhantomData<O>,
6692    labels: BTreeMap<String, String>,
6693}
6694
6695impl<C, I, O, Op> FeedbackInputNode<C, I, O, Op>
6696where
6697    Op: StrictUnaryOperator<I, O>,
6698    C: Circuit,
6699{
6700    fn new(operator: Rc<RefCell<Op>>, input_stream: Stream<C, I>, id: NodeId) -> Self {
6701        Self {
6702            id: input_stream.circuit().global_node_id().child(id),
6703            operator,
6704            input_stream,
6705            phantom_output: PhantomData,
6706            labels: BTreeMap::new(),
6707        }
6708    }
6709}
6710
6711impl<C, I, O, Op> Node for FeedbackInputNode<C, I, O, Op>
6712where
6713    Op: StrictUnaryOperator<I, O>,
6714    I: Data,
6715    O: 'static,
6716    C: Clone + 'static,
6717{
6718    fn name(&self) -> Cow<'static, str> {
6719        self.operator.borrow().name()
6720    }
6721
6722    fn local_id(&self) -> NodeId {
6723        self.id.local_node_id().unwrap()
6724    }
6725
6726    fn global_id(&self) -> &GlobalNodeId {
6727        &self.id
6728    }
6729
6730    fn is_async(&self) -> bool {
6731        self.operator.borrow().is_async()
6732    }
6733
6734    fn is_input(&self) -> bool {
6735        self.operator.borrow().is_input()
6736    }
6737
6738    fn ready(&self) -> bool {
6739        self.operator.borrow().ready()
6740    }
6741
6742    fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6743        self.operator.borrow_mut().register_ready_callback(cb);
6744    }
6745
6746    // Justification: see StreamValue::take() comment.
6747    #[allow(clippy::await_holding_refcell_ref)]
6748    fn eval<'a>(
6749        &'a mut self,
6750    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6751        Box::pin(async {
6752            match StreamValue::take(self.input_stream.val()) {
6753                Some(v) => self.operator.borrow_mut().eval_strict_owned(v).await,
6754                None => {
6755                    self.operator
6756                        .borrow_mut()
6757                        .eval_strict(StreamValue::peek(&self.input_stream.get()))
6758                        .await
6759                }
6760            };
6761
6762            StreamValue::consume_token(self.input_stream.val());
6763
6764            Ok(None)
6765        })
6766    }
6767
6768    fn start_transaction(&mut self) {
6769        self.operator.borrow_mut().start_transaction();
6770    }
6771
6772    fn flush(&mut self) {
6773        self.operator.borrow_mut().flush_input();
6774    }
6775
6776    fn is_flush_complete(&self) -> bool {
6777        self.operator.borrow().is_flush_input_complete()
6778    }
6779
6780    // Don't call `clock_start`/`clock_end` on the operator.  `FeedbackOutputNode`
6781    // will do that.
6782    fn clock_start(&mut self, _scope: Scope) {}
6783
6784    fn clock_end(&mut self, _scope: Scope) {}
6785
6786    fn init(&mut self) {
6787        self.operator.borrow_mut().init(&self.id);
6788    }
6789
6790    fn metadata(&self, output: &mut OperatorMeta) {
6791        self.operator.borrow().metadata(output)
6792    }
6793
6794    fn fixedpoint(&self, scope: Scope) -> bool {
6795        self.operator.borrow().fixedpoint(scope)
6796    }
6797
6798    fn checkpoint(
6799        &mut self,
6800        _base: &StoragePath,
6801        _files: &mut Vec<Arc<dyn FileCommitter>>,
6802    ) -> Result<(), DbspError> {
6803        // The Z-1 operator consists of two logical parts.
6804        // The first part gets invoked at the start of a clock cycle to retrieve the
6805        // state stored at the previous clock tick. The second one gets invoked
6806        // to store the updated state inside the operator. We only want to
6807        // invoke commit on one of them, doesn't matter which (so we
6808        // do it in FeedbackOutputNode)
6809        Ok(())
6810    }
6811
6812    fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6813        // See comment in `commit`.
6814        Ok(())
6815    }
6816
6817    fn start_compaction(&mut self) {}
6818
6819    fn clear_state(&mut self) -> Result<(), DbspError> {
6820        Ok(())
6821    }
6822
6823    fn start_replay(&mut self) -> Result<(), DbspError> {
6824        self.operator.borrow_mut().start_replay()
6825    }
6826
6827    fn is_replay_complete(&self) -> bool {
6828        self.operator.borrow().is_replay_complete()
6829    }
6830
6831    fn end_replay(&mut self) -> Result<(), DbspError> {
6832        self.operator.borrow_mut().end_replay()
6833    }
6834
6835    fn set_label(&mut self, key: &str, value: &str) {
6836        self.labels.insert(key.to_string(), value.to_string());
6837    }
6838
6839    fn get_label(&self, key: &str) -> Option<&str> {
6840        self.labels.get(key).map(|s| s.as_str())
6841    }
6842
6843    fn labels(&self) -> &BTreeMap<String, String> {
6844        &self.labels
6845    }
6846
6847    fn as_any(&self) -> &dyn Any {
6848        self
6849    }
6850}
6851
6852/// Input connector of a feedback operator.
6853///
6854/// This struct is part of the mechanism for constructing a feedback loop in a
6855/// circuit. It is returned by [`Circuit::add_feedback`] and represents the
6856/// input port of an operator whose input stream does not exist yet.  Once the
6857/// input stream has been created, it can be connected to the operator using
6858/// [`FeedbackConnector::connect`]. See [`Circuit::add_feedback`] for details.
6859pub struct FeedbackConnector<C, I, O, Op> {
6860    output_node_id: NodeId,
6861    circuit: C,
6862    operator: Rc<RefCell<Op>>,
6863    phantom_input: PhantomData<I>,
6864    phantom_output: PhantomData<O>,
6865}
6866
6867impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6868where
6869    Op: StrictUnaryOperator<I, O>,
6870{
6871    fn new(output_node_id: NodeId, circuit: C, operator: Rc<RefCell<Op>>) -> Self {
6872        Self {
6873            output_node_id,
6874            circuit,
6875            operator,
6876            phantom_input: PhantomData,
6877            phantom_output: PhantomData,
6878        }
6879    }
6880}
6881
6882impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6883where
6884    Op: StrictUnaryOperator<I, O>,
6885    I: Data,
6886    O: Data,
6887    C: Circuit,
6888{
6889    pub fn operator_mut(&self) -> RefMut<'_, Op> {
6890        self.operator.borrow_mut()
6891    }
6892
6893    /// Connect `input_stream` as input to the operator.
6894    ///
6895    /// See [`Circuit::add_feedback`] for details.
6896    /// Returns node id of the input node.
6897    pub fn connect(self, input_stream: &Stream<C, I>) {
6898        self.connect_with_preference(input_stream, OwnershipPreference::INDIFFERENT)
6899    }
6900
6901    pub fn connect_with_preference(
6902        self,
6903        input_stream: &Stream<C, I>,
6904        input_preference: OwnershipPreference,
6905    ) {
6906        self.circuit.connect_feedback_with_preference(
6907            self.output_node_id,
6908            self.operator,
6909            input_stream,
6910            input_preference,
6911        )
6912    }
6913}
6914
6915// A nested circuit instantiated as a node in a parent circuit.
6916struct ChildNode<C>
6917where
6918    C: Circuit,
6919{
6920    id: GlobalNodeId,
6921    circuit: C,
6922    executor: Box<dyn Executor<C>>,
6923    labels: BTreeMap<String, String>,
6924    nesting_depth: Scope,
6925}
6926
6927impl<C> Drop for ChildNode<C>
6928where
6929    C: Circuit,
6930{
6931    fn drop(&mut self) {
6932        // Explicitly deallocate all nodes in the circuit to break
6933        // cyclic `Rc` references between circuits and streams.
6934        self.circuit.clear();
6935    }
6936}
6937
6938impl<C> ChildNode<C>
6939where
6940    C: Circuit,
6941{
6942    fn new<E>(circuit: C, nesting_depth: Scope, executor: E) -> Self
6943    where
6944        E: Executor<C>,
6945    {
6946        Self {
6947            id: circuit.global_node_id(),
6948            circuit,
6949            executor: Box::new(executor) as Box<dyn Executor<C>>,
6950            labels: BTreeMap::new(),
6951            nesting_depth,
6952        }
6953    }
6954}
6955
6956impl<C> Node for ChildNode<C>
6957where
6958    C: Circuit,
6959{
6960    fn name(&self) -> Cow<'static, str> {
6961        Cow::Borrowed("Subcircuit")
6962    }
6963
6964    fn local_id(&self) -> NodeId {
6965        self.id.local_node_id().unwrap()
6966    }
6967
6968    fn global_id(&self) -> &GlobalNodeId {
6969        &self.id
6970    }
6971
6972    fn is_circuit(&self) -> bool {
6973        true
6974    }
6975
6976    fn is_async(&self) -> bool {
6977        false
6978    }
6979
6980    fn is_input(&self) -> bool {
6981        false
6982    }
6983
6984    fn ready(&self) -> bool {
6985        true
6986    }
6987
6988    fn eval<'a>(
6989        &'a mut self,
6990    ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6991        Box::pin(async {
6992            // We may want to make the executor responsible for evaluating import nodes
6993            // if there is a need for customizing this behavior.
6994            for node_id in self.circuit.import_nodes() {
6995                self.circuit.eval_import_node(node_id).await;
6996            }
6997            self.executor.transaction(&self.circuit).await?;
6998            Ok(None)
6999        })
7000    }
7001
7002    fn start_transaction(&mut self) {
7003        // Nested circuit has its own transactions.
7004    }
7005
7006    fn flush(&mut self) {
7007        self.executor.flush();
7008    }
7009
7010    fn is_flush_complete(&self) -> bool {
7011        self.executor.is_flush_complete()
7012    }
7013
7014    fn clock_start(&mut self, scope: Scope) {
7015        self.circuit.clock_start(scope + self.nesting_depth);
7016    }
7017
7018    fn clock_end(&mut self, scope: Scope) {
7019        self.circuit.clock_end(scope + self.nesting_depth);
7020    }
7021
7022    fn metadata(&self, _meta: &mut OperatorMeta) {}
7023
7024    fn fixedpoint(&self, scope: Scope) -> bool {
7025        self.circuit.check_fixedpoint(scope + self.nesting_depth)
7026    }
7027
7028    fn map_nodes_recursive(
7029        &self,
7030        f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
7031    ) -> Result<(), DbspError> {
7032        self.circuit.map_nodes_recursive(f)
7033    }
7034
7035    fn checkpoint(
7036        &mut self,
7037        _base: &StoragePath,
7038        _files: &mut Vec<Arc<dyn FileCommitter>>,
7039    ) -> Result<(), DbspError> {
7040        Ok(())
7041    }
7042
7043    fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
7044        Ok(())
7045    }
7046
7047    fn start_compaction(&mut self) {
7048        self.circuit.start_compaction();
7049    }
7050
7051    fn clear_state(&mut self) -> Result<(), DbspError> {
7052        self.circuit
7053            .map_local_nodes_mut(&mut |node| node.clear_state())
7054    }
7055
7056    fn start_replay(&mut self) -> Result<(), DbspError> {
7057        Ok(())
7058    }
7059
7060    fn is_replay_complete(&self) -> bool {
7061        true
7062    }
7063
7064    fn end_replay(&mut self) -> Result<(), DbspError> {
7065        Ok(())
7066    }
7067
7068    fn set_label(&mut self, key: &str, value: &str) {
7069        self.labels.insert(key.to_string(), value.to_string());
7070    }
7071
7072    fn get_label(&self, key: &str) -> Option<&str> {
7073        self.labels.get(key).map(|s| s.as_str())
7074    }
7075
7076    fn labels(&self) -> &BTreeMap<String, String> {
7077        &self.labels
7078    }
7079
7080    fn map_child(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
7081        self.circuit.map_node_relative(path, f);
7082    }
7083
7084    fn map_child_mut(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
7085        self.circuit.map_node_mut_relative(path, f);
7086    }
7087
7088    fn as_any(&self) -> &dyn Any {
7089        self
7090    }
7091
7092    fn as_circuit(&self) -> Option<&dyn CircuitBase> {
7093        Some(&self.circuit)
7094    }
7095}
7096
7097/// Top-level circuit with executor.
7098///
7099/// This is the interface to a circuit created with [`RootCircuit::build`].
7100/// Call [`CircuitHandle::transaction`] to run the circuit in the context of the
7101/// current thread.
7102pub struct CircuitHandle {
7103    circuit: RootCircuit,
7104    executor: Box<dyn Executor<RootCircuit>>,
7105    tokio_runtime: TokioRuntime,
7106    replay_info: Option<BootstrapInfo>,
7107}
7108
7109impl Drop for CircuitHandle {
7110    fn drop(&mut self) {
7111        self.circuit
7112            .log_scheduler_event(&SchedulerEvent::clock_end());
7113
7114        // Prevent nested panic when `drop` is invoked while panicking
7115        // and `clock_end` triggers another panic due to violated invariants
7116        // since the original panic interrupted normal execution.
7117        if !panicking() {
7118            self.circuit.clock_end(0)
7119        }
7120
7121        // We must explicitly deallocate all nodes in the circuit to break
7122        // cyclic `Rc` references between circuits and streams.  Alternatively,
7123        // we could use weak references to break cycles, but we'd have to
7124        // pay the cost of upgrading weak references on each access.
7125        self.circuit.clear();
7126    }
7127}
7128
7129/// Operators involved in the replay phase of a circuit.
7130#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
7131pub struct BootstrapInfo {
7132    /// Operators that will replay their contents during the replay phase.
7133    pub replay_sources: BTreeMap<NodeId, StreamId>,
7134
7135    /// Operators that require backfill from upstream nodes, including their persistent IDs.
7136    #[allow(dead_code)]
7137    pub need_backfill: BTreeMap<NodeId, Option<String>>,
7138}
7139
7140impl CircuitHandle {
7141    /// Start and instantly commit a transaction, waiting for the commit to complete.
7142    pub fn transaction(&self) -> Result<(), DbspError> {
7143        self.tokio_runtime
7144            .block_on(async {
7145                let local_set = LocalSet::new();
7146                local_set
7147                    .run_until(async { self.executor.transaction(&self.circuit).await })
7148                    .await
7149            })
7150            .map_err(DbspError::Scheduler)
7151    }
7152
7153    /// Start a transaction.
7154    ///
7155    /// A transaction consists of a sequence of steps that evaluate a set of inputs for a single logical
7156    /// clock tick.
7157    pub fn start_transaction(&self) -> Result<(), DbspError> {
7158        self.tokio_runtime
7159            .block_on(async {
7160                let local_set = LocalSet::new();
7161                local_set
7162                    .run_until(async { self.executor.start_transaction(&self.circuit).await })
7163                    .await
7164            })
7165            .map_err(DbspError::Scheduler)
7166    }
7167
7168    /// Start committing the current transaction by forcing all operators to process
7169    /// their inputs to completion.
7170    ///
7171    /// The caller must invoke `step` repeatedly until the commit is complete.
7172    pub fn start_commit_transaction(&self) -> Result<(), DbspError> {
7173        self.executor
7174            .start_commit_transaction()
7175            .map_err(DbspError::Scheduler)
7176    }
7177
7178    pub fn is_commit_complete(&self) -> bool {
7179        self.executor.is_commit_complete()
7180    }
7181
7182    pub fn commit_progress(&self) -> CommitProgress {
7183        self.executor.commit_progress()
7184    }
7185
7186    /// Evaluate the circuit for a single step.
7187    pub fn step(&self) -> Result<(), DbspError> {
7188        self.tokio_runtime
7189            .block_on(async {
7190                let local_set = LocalSet::new();
7191                local_set
7192                    .run_until(async { self.executor.step(&self.circuit).await })
7193                    .await
7194            })
7195            .map_err(DbspError::Scheduler)
7196    }
7197
7198    pub fn checkpoint(
7199        &mut self,
7200        base: &StoragePath,
7201        files: &mut Vec<Arc<dyn FileCommitter>>,
7202    ) -> Result<(), DbspError> {
7203        // if Runtime::worker_index() == 0 {
7204        //     self.circuit.to_dot_file(
7205        //         |node| {
7206        //             Some(crate::utils::DotNodeAttributes::new().with_label(&format!(
7207        //                 "{}-{}-{}",
7208        //                 node.local_id(),
7209        //                 node.name(),
7210        //                 node.persistent_id().unwrap_or_default()
7211        //             )))
7212        //         },
7213        //         |edge| {
7214        //             let style = if edge.is_dependency() {
7215        //                 Some("dotted".to_string())
7216        //             } else {
7217        //                 None
7218        //             };
7219        //             let label = if let Some(stream) = &edge.stream {
7220        //                 Some(format!("consumers: {}", stream.num_consumers()))
7221        //             } else {
7222        //                 None
7223        //             };
7224        //             Some(
7225        //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
7226        //                     .with_style(style)
7227        //                     .with_label(label),
7228        //             )
7229        //         },
7230        //         "commit.dot",
7231        //     );
7232        //     info!("CircuitHandle::commit: circuit written to commit.dot");
7233        // }
7234
7235        self.circuit
7236            .map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
7237                let _span = Span::new("operator")
7238                    .with_category("Checkpoint")
7239                    .with_tooltip(|| format!("{} {}", node.name(), node.global_id()));
7240                DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS
7241                    .record_callback(|| node.checkpoint(base, files))
7242            })
7243    }
7244
7245    /// Restores the circuit from a checkpoint.
7246    ///
7247    /// Restore the circuit from a checkpoint and prepare it to backfill new and
7248    /// modified parts of the circuit if necessary.
7249    ///
7250    /// 1. Find and restore the checkpointed state of each operator.
7251    /// 2. Identify stateful operators (such as integrals and output nodes) that don't have
7252    ///    a checkpoint and require backfill (the `need_backfill` set).
7253    /// 3. Iterate backward from `need_backfill` nodes to find all operators that
7254    ///    should participate in the replay phase of the circuit. Iteration stops when
7255    ///    reaching a stream whose contents can be replayed from an existing node
7256    ///    that has a checkpoint or an input node.
7257    /// 4. If the circuit requires backfill, prepare the circuit for replay by
7258    ///    configuring the scheduler to only schedule nodes that participate in
7259    ///    backfill.
7260    ///
7261    /// Returns `None` if the circuit does not require backfill; returns info about
7262    /// nodes that participate in backfill otherwise.
7263    ///
7264    /// * After calling this function, the client can invoke `step` repeatedly for replay to make progress.
7265    /// * Use `is_replay_complete` to determine whether the circuit has finished the replay.
7266    /// * Use `complete_replay` to finalize the replay phase and prepare the circuit for normal operation after replay is complete.
7267    pub fn restore(&mut self, base: &StoragePath) -> Result<Option<BootstrapInfo>, DbspError> {
7268        // Nodes that will act as replay sources during the replay phase of the circuit.
7269        let mut replay_sources: BTreeMap<NodeId, StreamId> = BTreeMap::new();
7270
7271        // Nodes that require backfill from upstream nodes.
7272        let mut need_backfill: BTreeSet<GlobalNodeId> = BTreeSet::new();
7273
7274        // debug!("CircuitHandle::restore: restoring from checkpoint {}", base);
7275
7276        // In persistent mode, refuse to silently treat a vanished state
7277        // file as backfill. dependencies.json records every state file at
7278        // commit time; if any are gone, the corruption is structural and
7279        // operators need to see it. A missing backend in persistent mode is
7280        // an invariant violation, surface it rather than skipping verify.
7281        if Runtime::mode() == Mode::Persistent {
7282            let backend = Runtime::storage_backend()?;
7283            crate::circuit::checkpointer::Checkpointer::verify_checkpoint_intact(
7284                backend.as_ref(),
7285                base,
7286            )?;
7287        }
7288
7289        // Initialize `need_backfill` to operators without a checkpoint.
7290        // Fail if there are any errors other than NotFound.
7291        // By the end of this, `need_backfill` will contain all new integrals and output
7292        // nodes that need backfill.
7293        self.circuit.map_nodes_recursive_mut(
7294            &mut |node: &mut dyn Node| match node.restore(base) {
7295                Err(e) if Runtime::mode() == Mode::Ephemeral => Err(e),
7296                Err(DbspError::Storage(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
7297                    need_backfill.insert(node.global_id().clone());
7298                    Ok(())
7299                }
7300                Err(DbspError::IO(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
7301                    need_backfill.insert(node.global_id().clone());
7302                    Ok(())
7303                }
7304                Err(e) => Err(e),
7305                Ok(()) => Ok(()),
7306            },
7307        )?;
7308
7309        // Additional nodes that must be backfilled to keep the balancer state consistent.
7310        let additional_need_backfill: BTreeSet<GlobalNodeId> =
7311            self.invalidate_balancer_clusters(&need_backfill);
7312        if Runtime::worker_index() == 0 {
7313            debug!(
7314                "CircuitHandle::restore: additional need backfill: {:?}",
7315                additional_need_backfill
7316            );
7317        }
7318        need_backfill.extend(additional_need_backfill);
7319
7320        debug!(
7321            "worker {}: CircuitHandle::restore: found {} operators that require backfill: {:?}",
7322            Runtime::worker_index(),
7323            need_backfill.len(),
7324            need_backfill.iter().cloned().collect::<Vec<GlobalNodeId>>()
7325        );
7326
7327        // We can only backfill a nested circuit as a whole, so if we encounter at least
7328        // one node in a nested circuit that needs backfill, we backfill the
7329        // entire circuit.
7330        let need_backfill = need_backfill
7331            .into_iter()
7332            .map(|gid| gid.top_level_ancestor())
7333            .collect::<BTreeSet<_>>();
7334
7335        // Iterate backward from `need_backfill` nodes to find all operators that
7336        // should participate in the replay.
7337
7338        // All nodes that participate in the replay phase, including replay sources and
7339        // nodes that need backfilling.
7340        let mut participate_in_backfill = need_backfill.clone();
7341
7342        // New nodes computed at each iteration.
7343        let mut participate_in_backfill_new = need_backfill.clone();
7344
7345        while !participate_in_backfill_new.is_empty() {
7346            participate_in_backfill_new = self.compute_replay_nodes_step(
7347                &mut replay_sources,
7348                &need_backfill,
7349                participate_in_backfill_new,
7350                &mut participate_in_backfill,
7351            )?;
7352        }
7353
7354        debug!(
7355            "worker {}: CircuitHandle::restore: replaying {} operators: {:?}\n  backfilling {} operators: {:?}\n  replay circuit consists of {} operators: {:?}",
7356            Runtime::worker_index(),
7357            replay_sources.len(),
7358            replay_sources.keys().cloned().collect::<Vec<NodeId>>(),
7359            need_backfill.len(),
7360            need_backfill.iter().cloned().collect::<Vec<NodeId>>(),
7361            participate_in_backfill.len(),
7362            participate_in_backfill
7363                .iter()
7364                .cloned()
7365                .collect::<Vec<NodeId>>()
7366        );
7367
7368        let replay_nodes = replay_sources.keys().cloned().collect::<BTreeSet<_>>();
7369
7370        assert!(
7371            replay_nodes
7372                .intersection(&need_backfill)
7373                .collect::<Vec<_>>()
7374                .is_empty()
7375        );
7376
7377        // Nodes that will be backfilled from upstream nodes, including need_backfill nodes
7378        // and their transitive ancestors.
7379        let nodes_to_backfill = participate_in_backfill
7380            .difference(&replay_nodes)
7381            .cloned()
7382            .collect::<BTreeSet<_>>();
7383
7384        if !participate_in_backfill.is_empty() {
7385            // Configure all `replay_nodes` to run in replay mode.
7386            for node_id in replay_nodes.iter() {
7387                self.circuit
7388                    .map_local_node_mut(*node_id, &mut |node| node.start_replay())?;
7389            }
7390
7391            // Clear the state of `need_backfill` nodes.
7392            for node_id in nodes_to_backfill.iter() {
7393                self.circuit
7394                    .map_local_node_mut(*node_id, &mut |node| node.clear_state())?;
7395            }
7396
7397            // Prepare the scheduler to only run `participate_in_backfill`.
7398            self.executor
7399                .prepare(&self.circuit, Some(&participate_in_backfill))?;
7400
7401            // info!("CircuitHandle::restore: replay circuit is ready");
7402
7403            // if Runtime::worker_index() == 0 {
7404            //     self.circuit.to_dot_file(
7405            //         |node| {
7406            //             if !node.global_id().is_child_of(self.circuit.global_id()) {
7407            //                 return None;
7408            //             }
7409            //             let color = if replay_sources.contains_key(&node.local_id()) {
7410            //                 Some(0xff5555)
7411            //             } else if participate_in_backfill.contains(&node.local_id()) {
7412            //                 Some(0x5555ff)
7413            //             } else {
7414            //                 None
7415            //             };
7416            //             Some(
7417            //                 crate::utils::DotNodeAttributes::new()
7418            //                     .with_color(color)
7419            //                     .with_label(&format!("{}: {}", node.global_id(), node.name())),
7420            //             )
7421            //         },
7422            //         |edge| {
7423            //             let style = if edge.is_dependency() {
7424            //                 Some("dotted".to_string())
7425            //             } else {
7426            //                 None
7427            //             };
7428            //             let label = if let Some(stream) = &edge.stream {
7429            //                 Some(format!("consumers: {}", stream.num_consumers()))
7430            //             } else {
7431            //                 None
7432            //             };
7433            //             Some(
7434            //                 crate::utils::DotEdgeAttributes::new(edge.stream_id())
7435            //                     .with_style(style)
7436            //                     .with_label(label),
7437            //             )
7438            //         },
7439            //         "replay.dot",
7440            //     );
7441            //     println!("CircuitHandle::restore: replay circuit is written to replay.dot");
7442            // }
7443
7444            // Add persistent IDs to the need_backfill set.
7445            let need_backfill = nodes_to_backfill
7446                .iter()
7447                .map(|node_id| {
7448                    let pid = self.circuit.map_local_node_mut(*node_id, &mut |node| {
7449                        node.get_label(LABEL_PERSISTENT_OPERATOR_ID)
7450                            .map(|s| s.to_string())
7451                    });
7452
7453                    (*node_id, pid)
7454                })
7455                .collect::<BTreeMap<_, _>>();
7456
7457            let replay_info = BootstrapInfo {
7458                replay_sources: replay_sources.clone(),
7459                need_backfill,
7460            };
7461
7462            self.replay_info = Some(replay_info.clone());
7463
7464            Ok(Some(replay_info))
7465        } else {
7466            Ok(None)
7467        }
7468    }
7469
7470    /// Compute additional nodes whose state must be discarded and backfilled in order
7471    /// to ensure that the balancer state is consistent.
7472    ///
7473    /// The new version of the circuit may have a different join graph in which the partitioning
7474    /// policy used to create the checkpoint may no longer be valid.
7475    ///
7476    /// Example:
7477    ///
7478    /// Consider the following checkpointed circuit:
7479    ///
7480    /// s1 = join(a, b)
7481    /// s2 = join(c, d)
7482    ///
7483    /// where streams a and c are partitioned using PartitioningPolicy::Broadcast, and
7484    /// b and d are partitioned using PartitioningPolicy::Shard.
7485    ///
7486    /// The new circuit adds
7487    ///
7488    /// s3 = join(a, c)
7489    ///
7490    /// This new circuit is in an inconsistent state since we can't join two broadcast streams.
7491    ///
7492    /// This function computes a conservative approximation of the set of nodes whose state must
7493    /// be discarded and backfilled to avoid such inconsistencies:
7494    ///
7495    /// 1. For each join cluster, check if there exists a solution that extends the partitioning
7496    ///    policies of the nodes restores from the checkpoint.
7497    /// 2. If no solution exists, mark all nodes in the cluster and their transitive successors
7498    ///    as needing backfill.
7499    fn invalidate_balancer_clusters(
7500        &self,
7501        need_backfill: &BTreeSet<GlobalNodeId>,
7502    ) -> BTreeSet<GlobalNodeId> {
7503        // Convert GlobalNodeId to top-level NodeId for comparison with balancer data.
7504        let need_backfill_node_ids: BTreeSet<NodeId> = need_backfill
7505            .iter()
7506            .map(|gid| gid.top_level_ancestor())
7507            .collect();
7508
7509        // Compute exchange sender nodes that need to be discarded and backfilled.
7510        let additional_need_backfill = self
7511            .circuit
7512            .balancer()
7513            .invalidate_clusters_for_bootstrapping(&need_backfill_node_ids);
7514
7515        // Invalidate all successors as well; otherwise we can end up with nodes that are not
7516        // marked for backfill but their ancestors are.
7517        let nodes_to_add = self.propagate_need_backfill_forward(
7518            additional_need_backfill
7519                .difference(&need_backfill_node_ids)
7520                .cloned()
7521                .collect(),
7522        );
7523
7524        // Convert NodeIds back to GlobalNodeIds and add to additional_need_backfill
7525        nodes_to_add
7526            .into_iter()
7527            .map(|node_id| GlobalNodeId::root().child(node_id))
7528            .collect()
7529    }
7530
7531    /// Compute all transitive successors of need_backfill nodes.
7532    fn propagate_need_backfill_forward(
7533        &self,
7534        mut need_backfill: BTreeSet<NodeId>,
7535    ) -> BTreeSet<NodeId> {
7536        // Recursively add all successors of these exchange sender nodes
7537        let mut worklist: Vec<NodeId> = need_backfill.iter().cloned().collect();
7538        let mut visited = BTreeSet::new();
7539
7540        while let Some(node_id) = worklist.pop() {
7541            if visited.contains(&node_id) {
7542                continue;
7543            }
7544            visited.insert(node_id);
7545
7546            // Get all successors of this node
7547            let successors: Vec<NodeId> = self
7548                .circuit
7549                .edges()
7550                .by_source
7551                .get(&node_id)
7552                .into_iter()
7553                .flat_map(|edges| edges.iter().map(|edge| edge.to))
7554                .collect();
7555
7556            for successor in successors {
7557                if !visited.contains(&successor) {
7558                    worklist.push(successor);
7559                    need_backfill.insert(successor);
7560                }
7561            }
7562
7563            // Add all dependencies of this node. Makes sure that the output half of Z-1 is marked for backfill.
7564            let dependencies: Vec<NodeId> = self
7565                .circuit
7566                .edges()
7567                .by_destination
7568                .get(&node_id)
7569                .into_iter()
7570                .flat_map(|edges| edges.iter())
7571                .filter(|edge| edge.is_dependency())
7572                .map(|edge| edge.from)
7573                .collect();
7574
7575            for dependency in dependencies {
7576                if !visited.contains(&dependency) {
7577                    worklist.push(dependency);
7578                    need_backfill.insert(dependency);
7579                }
7580            }
7581        }
7582
7583        need_backfill
7584    }
7585
7586    /// Iterative step of computing the set of nodes that participate in the replay phase.
7587    ///
7588    /// Find all input streams of nodes in `participate_in_backfill_new`.
7589    ///
7590    /// * For streams that can be replayed from a replay source:
7591    ///  - add the replay source to `replay_sources` and `participate_in_backfill`.
7592    ///  - create replay streams from the replay source to each consumer of the original stream.
7593    /// * For other streams, add their origin nodes to `participate_in_backfill`.
7594    ///
7595    /// Return the set of nodes newly added to `participate_in_backfill`.
7596    fn compute_replay_nodes_step(
7597        &self,
7598        replay_sources: &mut BTreeMap<NodeId, StreamId>,
7599        need_backfill: &BTreeSet<NodeId>,
7600        participate_in_backfill_new: BTreeSet<NodeId>,
7601        participate_in_backfill: &mut BTreeSet<NodeId>,
7602    ) -> Result<BTreeSet<NodeId>, DbspError> {
7603        let mut inputs = BTreeSet::new();
7604
7605        for node_id in participate_in_backfill_new.iter() {
7606            // Compute immediate ancestors of node_id, including:
7607            // 1. Nodes connected to node_id by a stream.
7608            // 2. Nodes that depend on node_id -- makes sure that if we schedule the output half of a strict operator,
7609            //    we will also schedule the input half.
7610            // 3. Nodes that node_id depends on -- Makes sure that if we schedule the output of an exchange operator,
7611            //    we will schedule the input part as well.
7612
7613            // 1.
7614            let node_inputs = self
7615                .circuit
7616                .edges()
7617                .by_destination
7618                .get(node_id)
7619                .iter()
7620                .flat_map(|edges| edges.iter())
7621                .filter(|edge| edge.is_stream())
7622                .map(|edge| {
7623                    // If the origin of the stream is a node inside the nested circuit, we will clear and replay the
7624                    // entire nested circuit, as we don't currently have a way to replay state from inside a nested
7625                    // circuit into a parent circuit.
7626                    (Some(edge.stream_id().unwrap()), edge.from)
7627                })
7628                .collect::<Vec<_>>();
7629
7630            for input in node_inputs.into_iter() {
7631                inputs.insert(input);
7632            }
7633
7634            // 2.
7635            for edge in self.circuit.edges().dependencies_of(*node_id) {
7636                inputs.insert((None, edge.from));
7637            }
7638
7639            // 3.
7640            for edge in self.circuit.edges().depend_on(*node_id) {
7641                inputs.insert((None, edge.to));
7642            }
7643        }
7644
7645        let mut participate_in_backfill_new = BTreeSet::new();
7646
7647        let mut replay_streams = BTreeMap::new();
7648
7649        for (stream_id, mut node_id) in inputs.into_iter() {
7650            // println!("replay needed for ({stream_id}, {node_id})");
7651
7652            // Add all ancestors of `participate_in_backfill_new` to the `participate_in_backfill` set, except streams
7653            // that can be replayed from a different node.
7654            if let Some(stream_id) = stream_id
7655                && let Some(replay_source) = self.circuit.get_replay_source(stream_id)
7656            {
7657                // If the replay source is itself in the need_backfill set (i.e., it's an integral without
7658                // a checkpoint), it cannot be used for replay.
7659                if !need_backfill.contains(&replay_source.local_node_id()) {
7660                    replay_streams.insert(stream_id, replay_source.clone());
7661                    // trace!(
7662                    //     "worker {}: Replacing node_id {node_id} with replay source {}",
7663                    //     Runtime::worker_index(),
7664                    //     replay_source.origin_node_id()
7665                    // );
7666
7667                    // Replace the node_id with the replay source.
7668                    node_id = replay_source.local_node_id();
7669                }
7670            }
7671
7672            if !participate_in_backfill.contains(&node_id) {
7673                // println!("Adding {gid} to participate_in_backfill via {stream_id:?}");
7674                participate_in_backfill.insert(node_id);
7675                participate_in_backfill_new.insert(node_id);
7676            }
7677        }
7678
7679        // Connect `replay_streams` to all operators that consume the original stream.
7680        for (original_stream, replay_stream) in replay_streams.into_iter() {
7681            replay_sources
7682                .entry(replay_stream.local_node_id())
7683                .or_insert_with(|| {
7684                    self.circuit
7685                        .add_replay_edges(original_stream, replay_stream.as_ref());
7686                    replay_stream.stream_id()
7687                });
7688        }
7689
7690        Ok(participate_in_backfill_new)
7691    }
7692
7693    /// Returns `true` if all replay sources have completed their replay.
7694    pub fn is_replay_complete(&self) -> bool {
7695        let Some(replay_info) = self.replay_info.as_ref() else {
7696            return true;
7697        };
7698
7699        // Bootstrapping is finished when all replay sources have completed their replay and the
7700        // transaction has been committed.
7701
7702        let all_complete = replay_info.replay_sources.keys().all(|node_id| {
7703            self.circuit
7704                .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
7705        });
7706
7707        all_complete && self.is_commit_complete()
7708    }
7709
7710    /// Finalize the replay phase of the circuit.
7711    ///
7712    /// * Notify all replay sources to go back to normal operation.
7713    /// * Delete all replay streams.
7714    /// * Prepare the scheduler to run the full circuit.
7715    pub fn complete_replay(&mut self) -> Result<(), DbspError> {
7716        // info!("Replay complete");
7717
7718        let Some(replay_info) = self.replay_info.take() else {
7719            return Ok(());
7720        };
7721
7722        // End replay mode.
7723        for (node_id, stream_id) in replay_info.replay_sources.iter() {
7724            self.circuit
7725                .map_local_node_mut(*node_id, &mut |node| node.end_replay())?;
7726            self.circuit.edges_mut().delete_stream(*stream_id);
7727        }
7728
7729        // Prepare the scheduler to run the full circuit.
7730        self.executor.prepare(&self.circuit, None)?;
7731
7732        // if Runtime::worker_index() == 0 {
7733        //     self.circuit.to_dot_file(
7734        //         |_node| Some(crate::circuit::dot::DotNodeAttributes::new()),
7735        //         |edge| {
7736        //             let style = if edge.is_dependency() {
7737        //                 Some("dotted".to_string())
7738        //             } else {
7739        //                 None
7740        //             };
7741        //             let label = if let Some(stream) = &edge.stream {
7742        //                 Some(format!("consumers: {}", stream.num_consumers()))
7743        //             } else {
7744        //                 None
7745        //             };
7746        //             Some(
7747        //                 crate::circuit::dot::DotEdgeAttributes::new(edge.stream_id())
7748        //                     .with_style(style)
7749        //                     .with_label(label),
7750        //             )
7751        //         },
7752        //         "final.dot",
7753        //     );
7754        //     info!("CircuitHandle::restore: final circuit is written to final.dot");
7755        // }
7756
7757        Ok(())
7758    }
7759
7760    pub fn fingerprint(&self) -> u64 {
7761        let mut fip = Fingerprinter::default();
7762        let _ = self.circuit.map_nodes_recursive(&mut |node: &dyn Node| {
7763            node.fingerprint(&mut fip);
7764            Ok(())
7765        });
7766        fip.finish()
7767    }
7768
7769    /// Attach a scheduler event handler to the circuit.
7770    ///
7771    /// This method is identical to
7772    /// [`RootCircuit::register_scheduler_event_handler`], but it can be used at
7773    /// runtime, after the circuit has been fully constructed.
7774    ///
7775    /// Use [`RootCircuit::register_scheduler_event_handler`],
7776    /// [`RootCircuit::unregister_scheduler_event_handler`], to manipulate
7777    /// handlers during circuit construction.
7778    pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
7779    where
7780        F: FnMut(&SchedulerEvent<'_>) + 'static,
7781    {
7782        self.circuit.register_scheduler_event_handler(name, handler);
7783    }
7784
7785    /// Remove a scheduler event handler.
7786    ///
7787    /// This method is identical to
7788    /// [`RootCircuit::unregister_scheduler_event_handler`], but it can be used
7789    /// at runtime, after the circuit has been fully constructed.
7790    pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
7791        self.circuit.unregister_scheduler_event_handler(name)
7792    }
7793
7794    /// Export circuit in LIR format.
7795    pub fn lir(&self) -> LirCircuit {
7796        (&self.circuit as &dyn CircuitBase).to_lir()
7797    }
7798
7799    pub fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError> {
7800        self.circuit.set_auto_rebalance(enable)
7801    }
7802
7803    pub fn set_balancer_hint_by_global_id(
7804        &self,
7805        global_node_id: &GlobalNodeId,
7806        hint: BalancerHint,
7807    ) -> Result<(), DbspError> {
7808        self.circuit
7809            .set_balancer_hint_by_global_id(global_node_id, hint)
7810    }
7811
7812    pub fn set_balancer_hint(
7813        &self,
7814        persistent_id: &str,
7815        hint: BalancerHint,
7816    ) -> Result<(), DbspError> {
7817        self.circuit.set_balancer_hint(persistent_id, hint)
7818    }
7819
7820    pub fn get_current_balancer_policies(&self) -> BTreeMap<GlobalNodeId, PartitioningPolicy> {
7821        self.circuit
7822            .get_current_balancer_policies()
7823            .into_iter()
7824            .map(|(node_id, policy)| (GlobalNodeId::root().child(node_id), policy))
7825            .collect()
7826    }
7827
7828    pub fn get_current_balancer_policy(
7829        &self,
7830        persistent_id: &str,
7831    ) -> Result<PartitioningPolicy, DbspError> {
7832        self.circuit.get_current_balancer_policy(persistent_id)
7833    }
7834
7835    pub fn rebalance(&self) {
7836        self.circuit.rebalance()
7837    }
7838
7839    pub fn start_compaction(&self) {
7840        self.circuit.start_compaction()
7841    }
7842}
7843
7844pin_project! {
7845    /// An async task that measures its runtime.
7846    ///
7847    /// This uses the same wrapper technique as [tokio_metrics::Instrumented] or
7848    /// [tracing::Instrument]: by implementing [Future] manually, it wraps each
7849    /// poll with elapsed time measurement.  When the future eventually
7850    /// completes, it outputs the wrapped future's output value plus the total
7851    /// elapsed time.
7852    ///
7853    /// [tokio_metrics::Instrumented]: https://docs.rs/tokio-metrics/latest/tokio_metrics/struct.Instrumented.html
7854    /// [tracing::Instrument]: https://docs.rs/tracing/latest/tracing/trait.Instrument.html
7855    #[derive(Debug)]
7856    pub struct Timed<T> {
7857        // The task being instrumented
7858        #[pin]
7859        task: T,
7860
7861        // Time spent running this task.
7862        elapsed: Duration,
7863    }
7864}
7865
7866impl<T> Timed<T> {
7867    fn new(task: T) -> Self {
7868        Self {
7869            task,
7870            elapsed: Duration::ZERO,
7871        }
7872    }
7873}
7874
7875impl<T> Future for Timed<T>
7876where
7877    T: Future,
7878{
7879    type Output = (T::Output, Duration);
7880
7881    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
7882        let this = self.project();
7883        let start = Instant::now();
7884        let ret = this.task.poll(cx);
7885        *this.elapsed += start.elapsed();
7886        ret.map(|value| (value, take(&mut *this.elapsed)))
7887    }
7888}
7889
7890#[cfg(test)]
7891mod tests {
7892    use crate::{
7893        Circuit, Error as DbspError, RootCircuit,
7894        circuit::schedule::{DynamicScheduler, Scheduler},
7895        monitor::TraceMonitor,
7896        operator::{Generator, Z1},
7897    };
7898    use anyhow::anyhow;
7899    use std::{cell::RefCell, ops::Deref, rc::Rc, vec::Vec};
7900
7901    #[test]
7902    fn sum_circuit_dynamic() {
7903        sum_circuit::<DynamicScheduler>();
7904    }
7905    // Compute the sum of numbers from 0 to 99.
7906    fn sum_circuit<S>()
7907    where
7908        S: Scheduler + 'static,
7909    {
7910        let actual_output: Rc<RefCell<Vec<isize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7911        let actual_output_clone = actual_output.clone();
7912        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7913            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7914            let mut n: isize = 0;
7915            let source = circuit.add_source(Generator::new(move || {
7916                let result = n;
7917                n += 1;
7918                result
7919            }));
7920            let integrator = source.integrate();
7921            integrator.inspect(|n| println!("{}", n));
7922            integrator.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7923            Ok(())
7924        })
7925        .unwrap()
7926        .0;
7927
7928        for _ in 0..100 {
7929            circuit.transaction().unwrap();
7930        }
7931
7932        let mut sum = 0;
7933        let mut expected_output: Vec<isize> = Vec::with_capacity(100);
7934        for i in 0..100 {
7935            sum += i;
7936            expected_output.push(sum);
7937        }
7938        assert_eq!(&expected_output, actual_output.borrow().deref());
7939    }
7940
7941    #[test]
7942    fn recursive_sum_circuit_dynamic() {
7943        recursive_sum_circuit::<DynamicScheduler>()
7944    }
7945
7946    fn recursive_sum_circuit<S>()
7947    where
7948        S: Scheduler + 'static,
7949    {
7950        let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7951        let actual_output_clone = actual_output.clone();
7952
7953        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7954            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7955
7956            let mut n: usize = 0;
7957            let source = circuit.add_source(Generator::new(move || {
7958                let result = n;
7959                n += 1;
7960                result
7961            }));
7962            let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
7963            let plus = source
7964                .apply2(&z1_output, |n1: &usize, n2: &usize| *n1 + *n2)
7965                .inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7966            z1_feedback.connect(&plus);
7967            Ok(())
7968        })
7969        .unwrap()
7970        .0;
7971
7972        for _ in 0..100 {
7973            circuit.transaction().unwrap();
7974        }
7975
7976        let mut sum = 0;
7977        let mut expected_output: Vec<usize> = Vec::with_capacity(100);
7978        for i in 0..100 {
7979            sum += i;
7980            expected_output.push(sum);
7981        }
7982        assert_eq!(&expected_output, actual_output.borrow().deref());
7983    }
7984
7985    #[test]
7986    fn factorial_dynamic() {
7987        factorial::<DynamicScheduler>();
7988    }
7989
7990    // Nested circuit.  The circuit contains a source node that counts up from
7991    // 1. For each `n` output by the source node, the nested circuit computes
7992    // factorial(n) using a `NestedSource` operator that counts from n down to
7993    // `1` and a multiplier that multiplies the next count by the product
7994    // computed so far (stored in z-1).
7995    fn factorial<S>()
7996    where
7997        S: Scheduler + 'static,
7998    {
7999        let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
8000        let actual_output_clone = actual_output.clone();
8001
8002        let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
8003            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
8004
8005            let mut n: usize = 0;
8006            let source = circuit.add_source(Generator::new(move || {
8007                n += 1;
8008                n
8009            }));
8010            let fact = circuit
8011                .iterate_with_condition_and_scheduler::<_, _, S>(|child| {
8012                    let mut counter = 0;
8013                    let countdown = source.delta0(child).apply_mut(move |parent_val| {
8014                        if *parent_val > 0 {
8015                            counter = *parent_val;
8016                        };
8017                        let res = counter;
8018                        counter -= 1;
8019                        res
8020                    });
8021                    let (z1_output, z1_feedback) = child.add_feedback_with_export(Z1::new(1));
8022                    let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
8023                    z1_feedback.connect(&mul);
8024                    Ok((countdown.condition(|n| *n <= 1), z1_output.export))
8025                })
8026                .unwrap();
8027            fact.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
8028            Ok(())
8029        })
8030        .unwrap()
8031        .0;
8032
8033        for _ in 1..10 {
8034            circuit.transaction().unwrap();
8035        }
8036
8037        let mut expected_output: Vec<usize> = Vec::with_capacity(10);
8038        for i in 1..10 {
8039            expected_output.push(my_factorial(i));
8040        }
8041        assert_eq!(&expected_output, actual_output.borrow().deref());
8042    }
8043
8044    fn my_factorial(n: usize) -> usize {
8045        if n == 1 { 1 } else { n * my_factorial(n - 1) }
8046    }
8047
8048    /// Verify that operators added across multiple `open_region`/`close_region`
8049    /// calls on the same [`RegionName`] all land in the same region tree node.
8050    #[test]
8051    fn open_close_region() {
8052        const REGION: &str = "my_region";
8053
8054        let monitor = TraceMonitor::new_panic_on_error();
8055        let region: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8056
8057        let _circuit = RootCircuit::build({
8058            let monitor = monitor.clone();
8059            let region = region.clone();
8060            move |circuit| {
8061                monitor.attach(circuit, "monitor");
8062
8063                let r = circuit.create_region_name(REGION, 0);
8064
8065                circuit.open_region(r.clone());
8066                let source1 = circuit.add_source(Generator::new(|| 1_i32));
8067                circuit.close_region(r.clone());
8068
8069                circuit.open_region(r.clone());
8070                let source2 = circuit.add_source(Generator::new(|| 2_i32));
8071                circuit.close_region(r.clone());
8072
8073                circuit.open_region(r.clone());
8074                source1
8075                    .apply2(&source2, |a: &i32, b: &i32| a + b)
8076                    .inspect(|_| {});
8077                circuit.close_region(r.clone());
8078
8079                *region.borrow_mut() = Some(r);
8080                Ok(())
8081            }
8082        })
8083        .unwrap()
8084        .0;
8085
8086        // source1, source2, apply2, inspect — all in the single "my_region" node.
8087        let region = region.borrow();
8088        assert_eq!(
8089            monitor.count_nodes_in_region(region.as_ref().unwrap()),
8090            Some(4)
8091        );
8092    }
8093
8094    /// Verify that two separate `create_region` calls with the same name
8095    /// produce independent region nodes: operators added via each handle
8096    /// land in different nodes, not the same one.
8097    #[test]
8098    fn separate_create_region_same_name() {
8099        const REGION: &str = "my_region";
8100
8101        let monitor = TraceMonitor::new_panic_on_error();
8102        // RegionNames are created inside the build closure; smuggle them out via Rc<RefCell>.
8103        let r1: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8104        let r2: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8105
8106        let _circuit = RootCircuit::build({
8107            let monitor = monitor.clone();
8108            let r1 = r1.clone();
8109            let r2 = r2.clone();
8110            move |circuit| {
8111                monitor.attach(circuit, "monitor");
8112
8113                let region1 = circuit.create_region_name(REGION, 0);
8114                let region2 = circuit.create_region_name(REGION, 1);
8115
8116                circuit.open_region(region1.clone());
8117                circuit.add_source(Generator::new(|| 1_i32));
8118                circuit.close_region(region1.clone());
8119
8120                circuit.open_region(region2.clone());
8121                circuit.add_source(Generator::new(|| 2_i32));
8122                circuit.close_region(region2.clone());
8123
8124                *r1.borrow_mut() = Some(region1);
8125                *r2.borrow_mut() = Some(region2);
8126                Ok(())
8127            }
8128        })
8129        .unwrap()
8130        .0;
8131
8132        let r1 = r1.borrow();
8133        let r2 = r2.borrow();
8134        // Two distinct region nodes, each containing one operator.
8135        assert_eq!(monitor.count_regions_with_name(REGION), 2);
8136        assert_eq!(monitor.count_nodes_in_region(r1.as_ref().unwrap()), Some(1));
8137        assert_eq!(monitor.count_nodes_in_region(r2.as_ref().unwrap()), Some(1));
8138    }
8139
8140    #[test]
8141    fn init_circuit_constructor_error() {
8142        match RootCircuit::build(|_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
8143            Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
8144            _ => panic!(),
8145        }
8146    }
8147}