use crate::{
Error as DbspError, Position, Runtime,
circuit::{
cache::{CircuitCache, CircuitStoreMarker},
fingerprinter::Fingerprinter,
metadata::OperatorMeta,
metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
operator_traits::{
BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
TernarySinkOperator, UnaryOperator,
},
runtime::Consensus,
schedule::{
CommitProgress, DynamicScheduler, Error as SchedulerError, Executor, IterativeExecutor,
OnceExecutor, Scheduler,
},
trace::{CircuitEvent, SchedulerEvent},
},
circuit_cache_key,
ir::LABEL_MIR_NODE_ID,
operator::dynamic::balance::{Balancer, BalancerError, BalancerHint, PartitioningPolicy},
samply::SamplySpan,
time::{Timestamp, UnitTimestamp},
};
#[cfg(doc)]
use crate::{
InputHandle, OutputHandle,
algebra::{IndexedZSet, ZSet},
operator::{Aggregator, Fold, Generator, Max, Min, time_series::RelRange},
trace::Batch,
};
use anyhow::Error as AnyError;
use dyn_clone::{DynClone, clone_box};
use feldera_ir::{LirCircuit, LirNodeId};
use feldera_storage::{FileCommitter, StoragePath};
use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
use std::{
any::{Any, TypeId, type_name_of_val},
borrow::Cow,
cell::{Cell, Ref, RefCell, RefMut},
collections::{BTreeMap, BTreeSet, HashMap},
fmt::{self, Debug, Display, Write},
future::Future,
io::ErrorKind,
marker::PhantomData,
mem::transmute,
ops::Deref,
panic::Location,
pin::Pin,
rc::Rc,
sync::Arc,
thread::panicking,
};
use tokio::{runtime::Runtime as TokioRuntime, task::LocalSet};
use tracing::debug;
use typedmap::{TypedMap, TypedMapKey};
use super::dbsp_handle::Mode;
const LABEL_PERSISTENT_OPERATOR_ID: &str = "persistent_id";
struct StreamValue<D> {
val: Option<D>,
consumers: usize,
tokens: Cell<usize>,
}
impl<D> StreamValue<D> {
const fn empty() -> Self {
Self {
val: None,
consumers: 0,
tokens: Cell::new(0),
}
}
fn put(&mut self, val: D) {
debug_assert!(self.val.is_none());
if self.consumers > 0 {
self.tokens = Cell::new(self.consumers);
self.val = Some(val);
}
}
fn peek<R>(this: &R) -> &D
where
R: Deref<Target = Self>,
{
debug_assert_ne!(this.tokens.get(), 0);
this.val.as_ref().unwrap()
}
fn take(this: &RefCell<Self>) -> Option<D>
where
D: Clone,
{
let tokens = this.borrow().tokens.get();
debug_assert_ne!(tokens, 0);
if tokens == 1 {
Some(this.borrow_mut().val.take().unwrap())
} else {
None
}
}
fn consume_token(this: &RefCell<Self>) {
let this_ref = this.borrow();
debug_assert_ne!(this_ref.tokens.get(), 0);
this_ref.tokens.update(|tokens| tokens - 1);
if this_ref.tokens.get() == 0 {
drop(this_ref);
this.borrow_mut().val.take();
}
}
}
#[repr(transparent)]
pub struct RefStreamValue<D>(Rc<RefCell<StreamValue<D>>>);
impl<D> Clone for RefStreamValue<D> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<D> RefStreamValue<D> {
pub fn empty() -> Self {
Self(Rc::new(RefCell::new(StreamValue::empty())))
}
fn get_mut(&self) -> RefMut<'_, StreamValue<D>> {
self.0.borrow_mut()
}
fn get(&self) -> Ref<'_, StreamValue<D>> {
self.0.borrow()
}
pub fn put(&self, d: D) {
let mut val = self.get_mut();
val.put(d);
}
unsafe fn transmute<D2>(&self) -> RefStreamValue<D2> {
unsafe {
RefStreamValue(std::mem::transmute::<
Rc<RefCell<StreamValue<D>>>,
Rc<RefCell<StreamValue<D2>>>,
>(self.0.clone()))
}
}
}
pub trait StreamMetadata: DynClone + 'static {
fn stream_id(&self) -> StreamId;
fn local_node_id(&self) -> NodeId;
fn origin_node_id(&self) -> &GlobalNodeId;
fn num_consumers(&self) -> usize;
fn clear_consumer_count(&self);
fn register_consumer(&self);
fn consume_token(&self);
}
dyn_clone::clone_trait_object!(StreamMetadata);
pub struct Stream<C, D> {
stream_id: StreamId,
local_node_id: NodeId,
origin_node_id: GlobalNodeId,
circuit: C,
val: RefStreamValue<D>,
}
impl<C, D> StreamMetadata for Stream<C, D>
where
C: Clone + 'static,
D: 'static,
{
fn stream_id(&self) -> StreamId {
self.stream_id
}
fn local_node_id(&self) -> NodeId {
self.local_node_id
}
fn origin_node_id(&self) -> &GlobalNodeId {
&self.origin_node_id
}
fn clear_consumer_count(&self) {
self.val.get_mut().consumers = 0;
}
fn num_consumers(&self) -> usize {
self.val.get().consumers
}
fn register_consumer(&self) {
self.val.get_mut().consumers += 1;
}
fn consume_token(&self) {
StreamValue::consume_token(self.val());
}
}
impl<C, D> Clone for Stream<C, D>
where
C: Clone,
{
fn clone(&self) -> Self {
Self {
stream_id: self.stream_id,
local_node_id: self.local_node_id,
origin_node_id: self.origin_node_id.clone(),
circuit: self.circuit.clone(),
val: self.val.clone(),
}
}
}
impl<C, D> Stream<C, D>
where
C: Clone,
{
pub(crate) unsafe fn transmute_payload<D2>(&self) -> Stream<C, D2> {
unsafe {
Stream {
stream_id: self.stream_id,
local_node_id: self.local_node_id,
origin_node_id: self.origin_node_id.clone(),
circuit: self.circuit.clone(),
val: self.val.transmute::<D2>(),
}
}
}
}
impl<C, D> Stream<C, D> {
pub fn local_node_id(&self) -> NodeId {
self.local_node_id
}
pub fn origin_node_id(&self) -> &GlobalNodeId {
&self.origin_node_id
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn circuit(&self) -> &C {
&self.circuit
}
pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool {
self.stream_id() == other.stream_id()
}
}
impl<C, D> Stream<C, D>
where
C: Circuit,
{
pub(crate) fn new(circuit: C, node_id: NodeId) -> Self {
Self {
stream_id: circuit.allocate_stream_id(),
local_node_id: node_id,
origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
circuit,
val: RefStreamValue::empty(),
}
}
pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self {
Self {
stream_id: circuit.allocate_stream_id(),
local_node_id: node_id,
origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
circuit,
val,
}
}
pub fn value(&self) -> RefStreamValue<D> {
self.val.clone()
}
pub fn export(&self) -> Stream<C::Parent, D>
where
C::Parent: Circuit,
D: 'static,
{
self.circuit()
.cache_get_or_insert_with(ExportId::new(self.stream_id()), || unimplemented!())
.clone()
}
pub fn set_label(&self, key: &str, val: &str) -> Self {
self.circuit.set_node_label(&self.origin_node_id, key, val);
self.clone()
}
pub fn get_label(&self, key: &str) -> Option<String> {
self.circuit.get_node_label(&self.origin_node_id, key)
}
pub fn set_persistent_id(&self, name: Option<&str>) -> Self {
if let Some(name) = name {
self.set_label(LABEL_PERSISTENT_OPERATOR_ID, name)
} else {
self.clone()
}
}
pub fn get_persistent_id(&self) -> Option<String> {
self.get_label(LABEL_PERSISTENT_OPERATOR_ID)
}
}
impl<C, D> Stream<C, D> {
fn with_origin(
circuit: C,
stream_id: StreamId,
node_id: NodeId,
origin_node_id: GlobalNodeId,
) -> Self {
Self {
stream_id,
local_node_id: node_id,
origin_node_id,
circuit,
val: RefStreamValue::empty(),
}
}
}
impl<C, D> Stream<C, D> {
pub(crate) fn map_value<T>(&self, f: impl Fn(&D) -> T) -> T {
f(StreamValue::peek(&self.get()))
}
fn get(&self) -> Ref<'_, StreamValue<D>> {
self.val.get()
}
fn val(&self) -> &RefCell<StreamValue<D>> {
&self.val.0
}
pub(crate) fn put(&self, d: D) {
self.val.put(d);
}
}
pub struct ExportStream<C, D>
where
C: Circuit,
{
pub local: Stream<C, D>,
pub export: Stream<C::Parent, D>,
}
pub type Scope = u16;
pub trait Node: Any {
fn local_id(&self) -> NodeId;
fn global_id(&self) -> &GlobalNodeId;
fn persistent_id(&self) -> Option<String> {
let worker_index = Runtime::worker_index();
match Runtime::mode() {
Mode::Ephemeral => Some(format!(
"{worker_index}-{}",
self.global_id().path_as_string()
)),
Mode::Persistent => self
.get_label(LABEL_PERSISTENT_OPERATOR_ID)
.map(|operator_id| format!("{worker_index}-{operator_id}")),
}
}
fn name(&self) -> Cow<'static, str>;
fn is_circuit(&self) -> bool {
false
}
fn is_input(&self) -> bool;
fn is_async(&self) -> bool;
fn ready(&self) -> bool;
fn register_ready_callback(&mut self, _cb: Box<dyn Fn() + Send + Sync>) {}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>>;
fn import(&mut self) {}
fn start_transaction(&mut self);
fn flush(&mut self);
fn is_flush_complete(&self) -> bool;
fn clock_start(&mut self, scope: Scope);
fn clock_end(&mut self, scope: Scope);
fn init(&mut self) {}
fn metadata(&self, output: &mut OperatorMeta);
fn fixedpoint(&self, scope: Scope) -> bool;
fn map_nodes_recursive(
&self,
_f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
Ok(())
}
fn map_nodes_recursive_mut(
&self,
_f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
Ok(())
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError>;
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError>;
fn clear_state(&mut self) -> Result<(), DbspError>;
fn start_replay(&mut self) -> Result<(), DbspError>;
fn is_replay_complete(&self) -> bool;
fn end_replay(&mut self) -> Result<(), DbspError>;
fn fingerprint(&self, fip: &mut Fingerprinter) {
fip.hash(type_name_of_val(self));
}
fn set_label(&mut self, key: &str, value: &str);
fn get_label(&self, key: &str) -> Option<&str>;
fn labels(&self) -> &BTreeMap<String, String>;
fn map_child(&self, _path: &[NodeId], _f: &mut dyn FnMut(&dyn Node)) {
panic!("map_child: not a circuit node")
}
fn map_child_mut(&self, _path: &[NodeId], _f: &mut dyn FnMut(&mut dyn Node)) {
panic!("map_child_mut: not a circuit node")
}
fn as_circuit(&self) -> Option<&dyn CircuitBase> {
None
}
fn as_any(&self) -> &dyn Any;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
#[repr(transparent)]
pub struct StreamId(usize);
impl StreamId {
pub fn new(id: usize) -> Self {
Self(id)
}
pub fn id(&self) -> usize {
self.0
}
}
impl Display for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_char('s')?;
Debug::fmt(&self.0, f)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[repr(transparent)]
pub struct NodeId(usize);
impl NodeId {
pub fn new(id: usize) -> Self {
Self(id)
}
pub fn id(&self) -> usize {
self.0
}
pub(super) fn root() -> Self {
Self(0)
}
}
impl Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_char('n')?;
Debug::fmt(&self.0, f)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct GlobalNodeId(Vec<NodeId>);
impl Serialize for GlobalNodeId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let s = self.node_identifier();
serializer.serialize_str(&s)
}
}
impl Display for GlobalNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("[")?;
let path = self.path();
for i in 0..path.len() {
f.write_str(&path[i].0.to_string())?;
if i < path.len() - 1 {
f.write_str(".")?;
}
}
f.write_str("]")
}
}
impl GlobalNodeId {
pub fn from_path(path: &[NodeId]) -> Self {
Self(path.to_owned())
}
pub fn from_path_vec(path: Vec<NodeId>) -> Self {
Self(path)
}
pub fn root() -> Self {
Self(Vec::new())
}
pub fn child(&self, child_id: NodeId) -> Self {
let mut path = Vec::with_capacity(self.path().len() + 1);
for id in self.path() {
path.push(*id);
}
path.push(child_id);
Self(path)
}
pub fn child_of<C>(circuit: &C, node_id: NodeId) -> Self
where
C: Circuit,
{
let mut ids = circuit.global_node_id().path().to_owned();
ids.push(node_id);
Self(ids)
}
pub fn node_identifier(&self) -> String {
let mut node_ident = "n".to_string();
for i in 0..self.path().len() {
node_ident.push_str(&self.path()[i].to_string());
if i < self.path().len() - 1 {
node_ident.push('_');
}
}
node_ident
}
pub fn local_node_id(&self) -> Option<NodeId> {
self.0.last().cloned()
}
pub fn parent_id(&self) -> Option<Self> {
self.0
.split_last()
.map(|(_, prefix)| GlobalNodeId::from_path(prefix))
}
pub fn is_child_of(&self, parent: &Self) -> bool {
self.parent_id().as_ref() == Some(parent)
}
pub fn path(&self) -> &[NodeId] {
&self.0
}
pub fn top_level_ancestor(&self) -> NodeId {
self.0[0]
}
pub(crate) fn path_as_string(&self) -> String {
self.0
.iter()
.map(|node_id| node_id.0.to_string())
.collect::<Vec<_>>()
.join("-")
}
pub fn lir_node_id(&self) -> LirNodeId {
LirNodeId::new(&self.path_as_string())
}
}
type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
type SchedulerEventHandler = Box<dyn FnMut(&SchedulerEvent<'_>)>;
type CircuitEventHandlers = Rc<RefCell<HashMap<String, CircuitEventHandler>>>;
type SchedulerEventHandlers = Rc<RefCell<HashMap<String, SchedulerEventHandler>>>;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
#[repr(transparent)]
pub struct OwnershipPreference(usize);
impl OwnershipPreference {
pub const fn new(val: usize) -> Self {
Self(val)
}
pub const INDIFFERENT: Self = Self::new(0);
pub const WEAKLY_PREFER_OWNED: Self = Self::new(40);
pub const PREFER_OWNED: Self = Self::new(50);
pub const STRONGLY_PREFER_OWNED: Self = Self::new(100);
pub const fn raw(&self) -> usize {
self.0
}
}
impl Default for OwnershipPreference {
#[inline]
fn default() -> Self {
Self::INDIFFERENT
}
}
impl Display for OwnershipPreference {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::INDIFFERENT => f.write_str("Indifferent"),
Self::WEAKLY_PREFER_OWNED => f.write_str("WeaklyPreferOwned"),
Self::PREFER_OWNED => f.write_str("PreferOwned"),
Self::STRONGLY_PREFER_OWNED => f.write_str("StronglyPreferOwned"),
Self(preference) => write!(f, "Preference({preference})"),
}
}
}
#[derive(Clone)]
pub struct Edge {
pub from: NodeId,
pub to: NodeId,
pub origin: GlobalNodeId,
pub stream: Option<Box<dyn StreamMetadata>>,
pub ownership_preference: Option<OwnershipPreference>,
}
#[allow(dead_code)]
impl Edge {
pub(crate) fn is_dependency(&self) -> bool {
self.ownership_preference.is_none()
}
pub(crate) fn is_stream(&self) -> bool {
self.stream.is_some()
}
pub(crate) fn stream_id(&self) -> Option<StreamId> {
self.stream.as_ref().map(|meta| meta.stream_id())
}
}
circuit_cache_key!(ExportId<C, D>(StreamId => Stream<C, D>));
circuit_cache_key!(ReplaySource(StreamId => Box<dyn StreamMetadata>));
pub(crate) fn register_replay_stream<C, B>(
circuit: &C,
stream: &Stream<C, B>,
replay_stream: &Stream<C, B>,
) where
C: Circuit,
B: 'static,
{
if TypeId::of::<()>() == TypeId::of::<C::Time>() {
circuit.cache_insert(
ReplaySource::new(stream.stream_id()),
Box::new(replay_stream.clone()),
);
}
}
pub trait WithClock {
type Time: Timestamp;
fn time(&self) -> Self::Time;
}
impl WithClock for () {
type Time = UnitTimestamp;
fn time(&self) -> Self::Time {
UnitTimestamp
}
}
impl<P, T> WithClock for ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
{
type Time = T;
fn time(&self) -> Self::Time {
self.time.borrow().clone()
}
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct CircuitMetadata {
metadata: HashMap<NodeId, serde_json::Value>,
}
#[derive(Default, Debug)]
pub struct MetadataExchangeInner {
local_metadata: RefCell<CircuitMetadata>,
global_metadata: RefCell<Vec<CircuitMetadata>>,
}
#[derive(Default, Debug, Clone)]
pub struct MetadataExchange {
inner: Rc<MetadataExchangeInner>,
}
impl MetadataExchange {
fn new() -> Self {
Self::default()
}
pub fn local_metadata(&self) -> CircuitMetadata {
self.inner.local_metadata.borrow().clone()
}
pub fn set_local_operator_metadata(&self, id: NodeId, metadata: serde_json::Value) {
self.inner
.local_metadata
.borrow_mut()
.metadata
.insert(id, metadata.clone());
}
pub fn clear_local_operator_metadata(&self, id: NodeId) {
self.inner.local_metadata.borrow_mut().metadata.remove(&id);
}
pub fn set_local_operator_metadata_typed<T>(&self, id: NodeId, metadata: T)
where
T: Serialize,
{
self.inner
.local_metadata
.borrow_mut()
.metadata
.insert(id, serde_json::to_value(metadata).unwrap());
}
pub fn get_local_operator_metadata(&self, id: NodeId) -> Option<serde_json::Value> {
self.inner
.local_metadata
.borrow()
.metadata
.get(&id)
.cloned()
}
pub fn get_local_operator_metadata_typed<T>(&self, id: NodeId) -> Option<T>
where
T: DeserializeOwned,
{
self.get_local_operator_metadata(id)
.map(|val| serde_json::from_value::<T>(val).unwrap())
}
pub fn set_global_metadata(&self, global_metadata: Vec<CircuitMetadata>) {
*self.inner.global_metadata.borrow_mut() = global_metadata;
}
pub fn get_global_metadata(&self) -> Vec<CircuitMetadata> {
self.inner.global_metadata.borrow().clone()
}
pub fn get_global_operator_metadata(&self, id: NodeId) -> Vec<Option<serde_json::Value>> {
self.inner
.global_metadata
.borrow()
.iter()
.map(|global_metadata| global_metadata.metadata.get(&id).cloned())
.collect()
}
pub fn get_global_operator_metadata_typed<T>(&self, id: NodeId) -> Vec<Option<T>>
where
T: DeserializeOwned,
{
self.inner
.global_metadata
.borrow()
.iter()
.map(|global_metadata| {
global_metadata
.metadata
.get(&id)
.cloned()
.map(|val| serde_json::from_value::<T>(val).unwrap())
})
.collect()
}
}
pub trait CircuitBase: 'static {
fn edges(&self) -> Ref<'_, Edges>;
fn edges_mut(&self) -> RefMut<'_, Edges>;
fn global_id(&self) -> &GlobalNodeId;
fn num_nodes(&self) -> usize;
fn node_ids(&self) -> Vec<NodeId>;
fn import_nodes(&self) -> Vec<NodeId>;
fn clear(&mut self);
fn add_dependency(&self, from: NodeId, to: NodeId);
fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>>;
fn allocate_stream_id(&self) -> StreamId;
fn last_stream_id(&self) -> RefCell<StreamId>;
fn root_scope(&self) -> Scope;
fn node_id(&self) -> NodeId;
fn global_node_id(&self) -> GlobalNodeId;
fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node));
fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node));
fn map_nodes_recursive(
&self,
f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError>;
fn map_nodes_recursive_mut(
&mut self,
f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError>;
fn map_local_nodes(
&self,
f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError>;
fn map_local_nodes_mut(
&self,
f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError>;
fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node));
fn map_subcircuits(
&self,
f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
) -> Result<(), DbspError>;
fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str);
fn set_persistent_node_id(&self, id: &GlobalNodeId, persistent_id: Option<&str>) {
if let Some(persistent_id) = persistent_id {
self.set_node_label(id, LABEL_PERSISTENT_OPERATOR_ID, persistent_id);
}
}
fn set_mir_node_id(&self, id: &GlobalNodeId, mir_id: Option<&str>) {
if let Some(mir_id) = mir_id {
self.set_node_label(id, LABEL_MIR_NODE_ID, mir_id);
}
}
fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String>;
fn get_persistent_node_id(&self, id: &GlobalNodeId) -> Option<String> {
self.get_node_label(id, LABEL_PERSISTENT_OPERATOR_ID)
}
fn check_fixedpoint(&self, scope: Scope) -> bool;
fn notify_start_transaction(&self) {
let _ = self.map_local_nodes_mut(&mut |node| {
node.start_transaction();
Ok(())
});
}
fn metadata_exchange(&self) -> &MetadataExchange;
fn balancer(&self) -> &Balancer;
fn set_balancer_hint(
&self,
global_node_id: &GlobalNodeId,
hint: BalancerHint,
) -> Result<(), DbspError>;
fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy>;
fn rebalance(&self);
}
pub trait Circuit: CircuitBase + Clone + WithClock {
type Parent;
fn parent(&self) -> Self::Parent;
fn root_circuit(&self) -> RootCircuit;
fn ptr_eq(this: &Self, other: &Self) -> bool;
fn circuit_event_handlers(&self) -> CircuitEventHandlers;
fn scheduler_event_handlers(&self) -> SchedulerEventHandlers;
fn log_circuit_event(&self, event: &CircuitEvent);
fn log_scheduler_event(&self, event: &SchedulerEvent<'_>);
fn map_node<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> T) -> T;
fn map_node_mut<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
fn map_local_node_mut<T>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
fn cache_get_or_insert_with<K, F>(&self, key: K, f: F) -> RefMut<'_, K::Value>
where
K: 'static + TypedMapKey<CircuitStoreMarker>,
F: FnMut() -> K::Value;
fn tick(&self);
fn clock_start(&self, scope: Scope);
fn clock_end(&self, scope: Scope);
fn ready(&self, id: NodeId) -> bool;
fn cache_insert<K>(&self, key: K, val: K::Value)
where
K: TypedMapKey<CircuitStoreMarker> + 'static;
fn cache_contains<K>(&self, key: &K) -> bool
where
K: TypedMapKey<CircuitStoreMarker> + 'static;
fn cache_get<K>(&self, key: &K) -> Option<K::Value>
where
K: TypedMapKey<CircuitStoreMarker> + 'static,
K::Value: Clone;
fn get_replay_source(&self, stream_id: StreamId) -> Option<Box<dyn StreamMetadata>> {
self.cache_get(&ReplaySource::new(stream_id))
}
fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata);
fn connect_stream<T: 'static>(
&self,
stream: &Stream<Self, T>,
to: NodeId,
ownership_preference: OwnershipPreference,
);
fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>);
fn is_async_node(&self, id: NodeId) -> bool;
fn eval_node(
&self,
id: NodeId,
) -> impl Future<Output = Result<Option<Position>, SchedulerError>>;
fn eval_import_node(&self, id: NodeId);
fn flush_node(&self, id: NodeId);
fn is_flush_complete(&self, id: NodeId) -> bool;
#[track_caller]
fn region<F, T>(&self, name: &str, f: F) -> T
where
F: FnOnce() -> T;
fn add_preprocessor(&self, preprocessor_node_id: NodeId);
fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
where
O: Data,
Op: SourceOperator<O>;
fn add_exchange<I, SndOp, O, RcvOp>(
&self,
sender: SndOp,
receiver: RcvOp,
input_stream: &Stream<Self, I>,
) -> Stream<Self, O>
where
I: Data,
O: Data,
SndOp: SinkOperator<I>,
RcvOp: SourceOperator<O>;
fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
&self,
sender: SndOp,
receiver: RcvOp,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
SndOp: SinkOperator<I>,
RcvOp: SourceOperator<O>;
fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
where
I: Data,
Op: SinkOperator<I>;
fn add_sink_with_preference<I, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> GlobalNodeId
where
I: Data,
Op: SinkOperator<I>;
fn add_binary_sink<I1, I2, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
) where
I1: Data,
I2: Data,
Op: BinarySinkOperator<I1, I2>;
fn add_binary_sink_with_preference<I1, I2, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
) where
I1: Data,
I2: Data,
Op: BinarySinkOperator<I1, I2>;
fn add_ternary_sink<I1, I2, I3, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
) -> GlobalNodeId
where
I1: Data,
I2: Data,
I3: Data,
Op: TernarySinkOperator<I1, I2, I3>;
fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
) -> GlobalNodeId
where
I1: Data,
I2: Data,
I3: Data,
Op: TernarySinkOperator<I1, I2, I3>;
fn add_unary_operator<I, O, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: UnaryOperator<I, O>;
fn add_unary_operator_with_preference<I, O, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: UnaryOperator<I, O>;
fn add_binary_operator<I1, I2, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
O: Data,
Op: BinaryOperator<I1, I2, O>;
fn add_binary_operator_with_preference<I1, I2, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
O: Data,
Op: BinaryOperator<I1, I2, O>;
fn add_ternary_operator<I1, I2, I3, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
O: Data,
Op: TernaryOperator<I1, I2, I3, O>;
#[allow(clippy::too_many_arguments)]
fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
O: Data,
Op: TernaryOperator<I1, I2, I3, O>;
fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
input_stream4: &Stream<Self, I4>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
I4: Data,
O: Data,
Op: QuaternaryOperator<I1, I2, I3, I4, O>;
#[allow(clippy::too_many_arguments)]
fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
input_stream4: (&Stream<Self, I4>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
I4: Data,
O: Data,
Op: QuaternaryOperator<I1, I2, I3, I4, O>;
fn add_nary_operator<'a, I, O, Op, Iter>(
&'a self,
operator: Op,
input_streams: Iter,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: NaryOperator<I, O>,
Iter: IntoIterator<Item = &'a Stream<Self, I>>;
fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
&'a self,
operator: Op,
input_streams: Iter,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: NaryOperator<I, O>,
Iter: IntoIterator<Item = &'a Stream<Self, I>>;
fn add_custom_node<N: Node, R>(
&self,
name: Cow<'static, str>,
constructor: impl FnOnce(NodeId) -> (N, R),
) -> R;
fn add_feedback<I, O, Op>(
&self,
operator: Op,
) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>;
fn add_feedback_persistent<I, O, Op>(
&self,
persistent_id: Option<&str>,
operator: Op,
) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>,
{
let (output, feedback) = self.add_feedback(operator);
output.set_persistent_id(persistent_id);
(output, feedback)
}
fn add_feedback_with_export<I, O, Op>(
&self,
operator: Op,
) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>;
fn add_feedback_with_export_persistent<I, O, Op>(
&self,
persistent_id: Option<&str>,
operator: Op,
) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>,
{
let (export, feedback) = self.add_feedback_with_export(operator);
export.local.set_persistent_id(persistent_id);
(export, feedback)
}
fn connect_feedback_with_preference<I, O, Op>(
&self,
output_node_id: NodeId,
operator: Rc<RefCell<Op>>,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>;
fn iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
E: Executor<IterativeCircuit<Self>>;
fn non_iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
E: Executor<NonIterativeCircuit<Self>>;
fn iterate<F, C, T>(&self, constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
C: AsyncFn() -> Result<bool, SchedulerError> + 'static;
fn iterate_with_scheduler<F, C, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
S: Scheduler + 'static;
fn fixedpoint<F, T>(&self, constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>;
fn fixedpoint_with_scheduler<F, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>,
S: Scheduler + 'static;
fn import_stream<I, O, Op>(
&self,
operator: Op,
parent_stream: &Stream<Self::Parent, I>,
) -> Stream<Self, O>
where
Self::Parent: Circuit,
I: Data,
O: Data,
Op: ImportOperator<I, O>;
fn import_stream_with_preference<I, O, Op>(
&self,
operator: Op,
parent_stream: &Stream<Self::Parent, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
Self::Parent: Circuit,
I: Data,
O: Data,
Op: ImportOperator<I, O>;
}
pub struct Edges {
by_source: BTreeMap<NodeId, Vec<Rc<Edge>>>,
by_destination: BTreeMap<NodeId, Vec<Rc<Edge>>>,
by_stream: BTreeMap<Option<StreamId>, Vec<Rc<Edge>>>,
}
impl Edges {
fn new() -> Self {
Self {
by_source: BTreeMap::new(),
by_destination: BTreeMap::new(),
by_stream: BTreeMap::new(),
}
}
fn add_edge(&mut self, edge: Edge) {
let edge = Rc::new(edge);
self.by_source
.entry(edge.from)
.or_default()
.push(edge.clone());
self.by_destination
.entry(edge.to)
.or_default()
.push(edge.clone());
self.by_stream
.entry(edge.stream.as_ref().map(|s| s.stream_id()))
.or_default()
.push(edge);
}
fn extend<I>(&mut self, edges: I)
where
I: IntoIterator<Item = Edge>,
{
for edge in edges {
self.add_edge(edge)
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &Edge> {
self.by_source
.values()
.flat_map(|edges| edges.iter().map(|edge| edge.as_ref()))
}
pub(crate) fn get_by_stream_id(&self, stream_id: &Option<StreamId>) -> Option<&[Rc<Edge>]> {
self.by_stream.get(stream_id).map(|v| v.as_slice())
}
fn delete_stream(&mut self, stream_id: StreamId) {
if let Some(edges) = self.by_stream.remove(&Some(stream_id)) {
for edge in edges {
if let Some(v) = self.by_source.get_mut(&edge.from) {
v.retain(|e| e.stream_id() != Some(stream_id))
}
if let Some(v) = self.by_destination.get_mut(&edge.to) {
v.retain(|e| e.stream_id() != Some(stream_id))
}
}
}
}
pub(crate) fn inputs_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
self.by_destination
.get(&node_id)
.into_iter()
.flatten()
.map(|edge| edge.as_ref())
}
pub(crate) fn depend_on(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
self.by_source.get(&node_id).into_iter().flat_map(|edges| {
edges.iter().filter_map(|edge| {
if edge.is_dependency() {
Some(edge.as_ref())
} else {
None
}
})
})
}
pub(crate) fn dependencies_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
self.by_destination
.get(&node_id)
.into_iter()
.flat_map(|edges| {
edges.iter().filter_map(|edge| {
if edge.is_dependency() {
Some(edge.as_ref())
} else {
None
}
})
})
}
fn clear(&mut self) {
*self = Self::new();
}
}
struct CircuitInner<P>
where
P: 'static,
{
parent: P,
root: Option<RootCircuit>,
root_scope: Scope,
node_id: NodeId,
global_node_id: GlobalNodeId,
nodes: RefCell<Vec<RefCell<Box<dyn Node>>>>,
edges: RefCell<Edges>,
import_nodes: RefCell<Vec<NodeId>>,
circuit_event_handlers: CircuitEventHandlers,
scheduler_event_handlers: SchedulerEventHandlers,
store: RefCell<CircuitCache>,
last_stream_id: RefCell<StreamId>,
metadata_exchange: MetadataExchange,
balancer: Rc<Balancer>,
}
impl<P> CircuitInner<P>
where
P: 'static,
{
#[allow(clippy::too_many_arguments)]
fn new(
parent: P,
root: Option<RootCircuit>,
root_scope: Scope,
node_id: NodeId,
global_node_id: GlobalNodeId,
circuit_event_handlers: CircuitEventHandlers,
scheduler_event_handlers: SchedulerEventHandlers,
last_stream_id: RefCell<StreamId>,
) -> Self {
let metadata_exchange = MetadataExchange::new();
Self {
parent,
root,
root_scope,
node_id,
global_node_id,
nodes: RefCell::new(Vec::new()),
edges: RefCell::new(Edges::new()),
import_nodes: RefCell::new(Vec::new()),
circuit_event_handlers,
scheduler_event_handlers,
store: RefCell::new(TypedMap::new()),
last_stream_id,
metadata_exchange: metadata_exchange.clone(),
balancer: Rc::new(Balancer::new(&metadata_exchange)),
}
}
fn add_edge(&self, edge: Edge) {
self.edges.borrow_mut().add_edge(edge);
}
fn add_node<N>(&self, mut node: N)
where
N: Node + 'static,
{
node.init();
self.nodes
.borrow_mut()
.push(RefCell::new(Box::new(node) as Box<dyn Node>));
}
fn add_import_node(&self, node_id: NodeId) {
self.import_nodes.borrow_mut().push(node_id);
}
fn import_nodes(&self) -> Vec<NodeId> {
self.import_nodes.borrow().clone()
}
fn clear(&self) {
self.nodes.borrow_mut().clear();
self.edges.borrow_mut().clear();
self.store.borrow_mut().clear();
}
fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
where
F: Fn(&CircuitEvent) + 'static,
{
self.circuit_event_handlers.borrow_mut().insert(
name.to_string(),
Box::new(handler) as Box<dyn Fn(&CircuitEvent)>,
);
}
fn unregister_circuit_event_handler(&self, name: &str) -> bool {
self.circuit_event_handlers
.borrow_mut()
.remove(name)
.is_some()
}
fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
where
F: FnMut(&SchedulerEvent<'_>) + 'static,
{
self.scheduler_event_handlers.borrow_mut().insert(
name.to_string(),
Box::new(handler) as Box<dyn FnMut(&SchedulerEvent<'_>)>,
);
}
fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
self.scheduler_event_handlers
.borrow_mut()
.remove(name)
.is_some()
}
fn log_circuit_event(&self, event: &CircuitEvent) {
for (_, handler) in self.circuit_event_handlers.borrow().iter() {
handler(event)
}
}
fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
for (_, handler) in self.scheduler_event_handlers.borrow_mut().iter_mut() {
handler(event)
}
}
fn check_fixedpoint(&self, scope: Scope) -> bool {
self.nodes.borrow().iter().all(|node| {
node.borrow().fixedpoint(scope)
})
}
}
pub struct ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
{
inner: Rc<CircuitInner<P>>,
time: Rc<RefCell<T>>,
}
pub type RootCircuit = ChildCircuit<(), ()>;
pub type NestedCircuit = ChildCircuit<RootCircuit, <() as Timestamp>::Nested>;
pub type IterativeCircuit<P> = ChildCircuit<P, <<P as WithClock>::Time as Timestamp>::Nested>;
pub type NonIterativeCircuit<P> = ChildCircuit<P, <P as WithClock>::Time>;
impl<P, T> Clone for ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
time: self.time.clone(),
}
}
}
impl<P, T> ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
{
fn inner(&self) -> &CircuitInner<P> {
&self.inner
}
}
impl RootCircuit {
pub fn build<F, T>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
where
F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
{
Self::build_with_scheduler::<F, T, DynamicScheduler>(constructor)
}
pub fn build_with_scheduler<F, T, S>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
where
F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
S: Scheduler + 'static,
{
let tokio_runtime = tokio::runtime::Builder::new_current_thread()
.build()
.map_err(|e| {
DbspError::Scheduler(SchedulerError::TokioError {
error: e.to_string(),
})
})?;
let mut circuit = RootCircuit::new();
let res = constructor(&mut circuit).map_err(DbspError::Constructor)?;
let mut executor = Box::new(<OnceExecutor<S>>::new()) as Box<dyn Executor<RootCircuit>>;
executor.prepare(&circuit, None)?;
circuit.log_scheduler_event(&SchedulerEvent::clock_start());
circuit.clock_start(0);
Ok((
CircuitHandle {
circuit,
executor,
tokio_runtime,
replay_info: None,
},
res,
))
}
}
impl RootCircuit {
fn new() -> Self {
Self {
inner: Rc::new(CircuitInner::new(
(),
None,
0,
NodeId::root(),
GlobalNodeId::root(),
Rc::new(RefCell::new(HashMap::new())),
Rc::new(RefCell::new(HashMap::new())),
RefCell::new(StreamId::new(0)),
)),
time: Rc::new(RefCell::new(())),
}
}
}
impl RootCircuit {
pub fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
where
F: Fn(&CircuitEvent) + 'static,
{
self.inner().register_circuit_event_handler(name, handler);
}
pub fn unregister_circuit_event_handler(&self, name: &str) -> bool {
self.inner().unregister_circuit_event_handler(name)
}
pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
where
F: FnMut(&SchedulerEvent<'_>) + 'static,
{
self.inner().register_scheduler_event_handler(name, handler);
}
pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
self.inner().unregister_scheduler_event_handler(name)
}
}
impl<P, T> ChildCircuit<P, T>
where
P: Circuit,
T: Timestamp,
{
fn with_parent(parent: P, id: NodeId) -> Self {
let global_node_id = parent.global_node_id().child(id);
let circuit_handlers = parent.circuit_event_handlers();
let sched_handlers = parent.scheduler_event_handlers();
let root_scope = parent.root_scope() + 1;
let last_stream_id = parent.last_stream_id();
let root = parent.root_circuit();
ChildCircuit {
inner: Rc::new(CircuitInner::new(
parent,
Some(root),
root_scope,
id,
global_node_id,
circuit_handlers,
sched_handlers,
last_stream_id,
)),
time: Rc::new(RefCell::new(Timestamp::clock_start())),
}
}
pub fn is_child_of(&self, other: &P) -> bool {
P::ptr_eq(&self.inner().parent, other)
}
}
impl<P, T> ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
Self: Circuit,
{
fn node_id(&self) -> NodeId {
self.inner().node_id
}
fn add_node<F, N, V>(&self, f: F) -> V
where
F: FnOnce(NodeId) -> (N, V),
N: Node + 'static,
{
let id = self.inner().nodes.borrow().len();
let (node, res) = f(NodeId(id));
self.inner().add_node(node);
res
}
fn add_import_node(&self, node_id: NodeId) {
self.inner().add_import_node(node_id);
}
fn try_add_node<F, N, V, E>(&self, f: F) -> Result<V, E>
where
F: FnOnce(NodeId) -> Result<(N, V), E>,
N: Node + 'static,
{
let id = self.inner().nodes.borrow().len();
let (node, res) = f(NodeId(id))?;
self.inner().add_node(node);
Ok(res)
}
fn log_circuit_event(&self, event: &CircuitEvent) {
self.inner().log_circuit_event(event);
}
pub(super) fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
self.inner().log_scheduler_event(event);
}
}
impl<P, T> CircuitBase for ChildCircuit<P, T>
where
P: Clone + 'static,
T: Timestamp,
{
fn edges(&self) -> Ref<'_, Edges> {
self.inner().edges.borrow()
}
fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>> {
let edges = self.edges();
let mut result = BTreeMap::new();
for node_id in self.node_ids() {
let mut ancestors = BTreeSet::new();
let mut queue = vec![node_id];
while let Some(current) = queue.pop() {
for edge in edges.inputs_of(current) {
let ancestor_node = edge.from;
if ancestors.insert(ancestor_node) {
queue.push(ancestor_node);
}
}
}
result.insert(node_id, ancestors);
}
result
}
fn edges_mut(&self) -> RefMut<'_, Edges> {
self.inner().edges.borrow_mut()
}
fn num_nodes(&self) -> usize {
self.inner().nodes.borrow().len()
}
fn clear(&mut self) {
self.inner().clear();
}
fn add_dependency(&self, from: NodeId, to: NodeId) {
self.log_circuit_event(&CircuitEvent::dependency(
self.global_node_id().child(from),
self.global_node_id().child(to),
));
let origin = self.global_node_id().child(from);
self.inner().add_edge(Edge {
from,
to,
origin,
stream: None,
ownership_preference: None,
});
}
fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
let nodes = self.inner().nodes.borrow();
let node = nodes[path[0].0].borrow();
if path.len() == 1 {
f(node.as_ref())
} else {
node.map_child(&path[1..], &mut |node| f(node));
}
}
fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
let nodes = self.inner().nodes.borrow();
let mut node = nodes[path[0].0].borrow_mut();
if path.len() == 1 {
f(node.as_mut())
} else {
node.map_child_mut(&path[1..], &mut |node| f(node));
}
}
fn map_nodes_recursive(
&self,
f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
for node in self.inner().nodes.borrow().iter() {
f(node.borrow().as_ref())?;
node.borrow().map_nodes_recursive(f)?;
}
Ok(())
}
fn map_nodes_recursive_mut(
&mut self,
f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
for node in self.inner().nodes.borrow_mut().iter_mut() {
f(node.borrow_mut().as_mut())?;
node.borrow_mut().map_nodes_recursive_mut(f)?;
}
Ok(())
}
fn map_local_nodes(
&self,
f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
for node in self.inner().nodes.borrow().iter() {
f(node.borrow().as_ref())?;
}
Ok(())
}
fn map_local_nodes_mut(
&self,
f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
for node in self.inner().nodes.borrow_mut().iter_mut() {
f(node.borrow_mut().as_mut())?;
}
Ok(())
}
fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node)) {
self.map_node_mut_relative(&[id], &mut |node| f(node));
}
fn map_subcircuits(
&self,
f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
for node in self.inner().nodes.borrow().iter() {
let node = node.borrow();
if let Some(child_circuit) = node.as_circuit() {
f(child_circuit)?;
}
}
Ok(())
}
fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str) {
self.map_node_mut(id, &mut |node| node.set_label(key, val));
}
fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String> {
self.map_node(id, &mut |node| node.get_label(key).map(str::to_string))
}
fn global_id(&self) -> &GlobalNodeId {
&self.inner().global_node_id
}
fn node_ids(&self) -> Vec<NodeId> {
self.inner()
.nodes
.borrow()
.iter()
.map(|node| node.borrow().local_id())
.collect()
}
fn import_nodes(&self) -> Vec<NodeId> {
self.inner().import_nodes()
}
fn allocate_stream_id(&self) -> StreamId {
let circuit = self.inner();
let mut last_stream_id = circuit.last_stream_id.borrow_mut();
last_stream_id.0 += 1;
*last_stream_id
}
fn last_stream_id(&self) -> RefCell<StreamId> {
self.inner().last_stream_id.clone()
}
fn root_scope(&self) -> Scope {
self.inner().root_scope
}
fn node_id(&self) -> NodeId {
self.inner().node_id
}
fn global_node_id(&self) -> GlobalNodeId {
self.inner().global_node_id.clone()
}
fn check_fixedpoint(&self, scope: Scope) -> bool {
self.inner().check_fixedpoint(scope)
}
fn metadata_exchange(&self) -> &MetadataExchange {
&self.inner().metadata_exchange
}
fn balancer(&self) -> &Balancer {
&self.inner().balancer
}
fn set_balancer_hint(
&self,
global_node_id: &GlobalNodeId,
hint: BalancerHint,
) -> Result<(), DbspError> {
if global_node_id.parent_id() != Some(GlobalNodeId::root()) {
return Err(DbspError::Balancer(BalancerError::NonTopLevelNode(
global_node_id.clone(),
)));
}
self.inner()
.balancer
.set_hint(global_node_id.local_node_id().unwrap(), hint)
}
fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy> {
self.inner().balancer.get_policy()
}
fn rebalance(&self) {
self.inner().balancer.rebalance()
}
}
impl<P, T> Circuit for ChildCircuit<P, T>
where
P: Clone + 'static,
T: Timestamp,
{
type Parent = P;
fn parent(&self) -> P {
self.inner().parent.clone()
}
fn root_circuit(&self) -> RootCircuit {
if <dyn Any>::is::<RootCircuit>(self) {
unsafe { transmute::<&Self, &RootCircuit>(self) }.clone()
} else {
self.inner().root.as_ref().unwrap().clone()
}
}
fn map_node<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> V) -> V {
let path = id.path();
let mut result: Option<V> = None;
assert!(path.starts_with(self.global_id().path()));
self.map_node_relative(
path.strip_prefix(self.global_id().path()).unwrap(),
&mut |node| result = Some(f(node)),
);
result.unwrap()
}
fn map_node_mut<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
let path = id.path();
let mut result: Option<V> = None;
assert!(path.starts_with(self.global_id().path()));
self.map_node_mut_relative(
path.strip_prefix(self.global_id().path()).unwrap(),
&mut |node| result = Some(f(node)),
);
result.unwrap()
}
fn map_local_node_mut<V>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
let mut result: Option<V> = None;
self.map_node_mut_relative(&[id], &mut |node| result = Some(f(node)));
result.unwrap()
}
fn ptr_eq(this: &Self, other: &Self) -> bool {
Rc::ptr_eq(&this.inner, &other.inner)
}
fn circuit_event_handlers(&self) -> CircuitEventHandlers {
self.inner().circuit_event_handlers.clone()
}
fn scheduler_event_handlers(&self) -> SchedulerEventHandlers {
self.inner().scheduler_event_handlers.clone()
}
fn log_circuit_event(&self, event: &CircuitEvent) {
self.inner().log_circuit_event(event);
}
fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
self.inner().log_scheduler_event(event);
}
fn cache_get_or_insert_with<K, F>(&self, key: K, mut f: F) -> RefMut<'_, K::Value>
where
K: 'static + TypedMapKey<CircuitStoreMarker>,
F: FnMut() -> K::Value,
{
if self.inner().store.borrow().contains_key(&key) {
return RefMut::map(self.inner().store.borrow_mut(), |store| {
store.get_mut(&key).unwrap()
});
}
let new = f();
RefMut::map(self.inner().store.borrow_mut(), |store| {
store.entry(key).or_insert(new)
})
}
fn connect_stream<V: 'static>(
&self,
stream: &Stream<Self, V>,
to: NodeId,
ownership_preference: OwnershipPreference,
) {
self.log_circuit_event(&CircuitEvent::stream(
stream.origin_node_id().clone(),
self.global_node_id().child(to),
ownership_preference,
));
debug_assert_eq!(self.global_node_id(), stream.circuit.global_node_id());
self.inner().add_edge(Edge {
from: stream.local_node_id(),
to,
origin: stream.origin_node_id().clone(),
stream: Some(Box::new(stream.clone())),
ownership_preference: Some(ownership_preference),
});
}
fn tick(&self) {
let mut time = self.time.borrow_mut();
*time = time.advance(0);
}
fn clock_start(&self, scope: Scope) {
for node in self.inner().nodes.borrow_mut().iter_mut() {
node.borrow_mut().clock_start(scope);
}
}
fn clock_end(&self, scope: Scope) {
for node in self.inner().nodes.borrow_mut().iter_mut() {
node.borrow_mut().clock_end(scope);
}
let mut time = self.time.borrow_mut();
*time = time.advance(scope + 1);
}
fn ready(&self, id: NodeId) -> bool {
self.inner().nodes.borrow()[id.0].borrow().ready()
}
fn cache_insert<K>(&self, key: K, val: K::Value)
where
K: TypedMapKey<CircuitStoreMarker> + 'static,
{
self.inner().store.borrow_mut().insert(key, val);
}
fn cache_contains<K>(&self, key: &K) -> bool
where
K: TypedMapKey<CircuitStoreMarker> + 'static,
{
self.inner().store.borrow().contains_key(key)
}
fn cache_get<K>(&self, key: &K) -> Option<K::Value>
where
K: TypedMapKey<CircuitStoreMarker> + 'static,
K::Value: Clone,
{
self.inner().store.borrow().get(key).cloned()
}
fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>) {
self.inner().nodes.borrow()[id.0]
.borrow_mut()
.register_ready_callback(cb);
}
fn is_async_node(&self, id: NodeId) -> bool {
self.inner().nodes.borrow()[id.0].borrow().is_async()
}
#[allow(clippy::await_holding_refcell_ref)]
async fn eval_node(&self, id: NodeId) -> Result<Option<Position>, SchedulerError> {
let circuit = self.inner();
debug_assert!(id.0 < circuit.nodes.borrow().len());
circuit.log_scheduler_event(&SchedulerEvent::eval_start(
circuit.nodes.borrow()[id.0].borrow().as_ref(),
));
let span = SamplySpan::new("eval")
.with_category("Operator")
.with_tooltip(|| {
let nodes = circuit.nodes.borrow();
let node = nodes[id.0].borrow();
format!("{} {}", node.name(), node.global_id())
});
let progress = circuit.nodes.borrow()[id.0].borrow_mut().eval().await?;
span.record();
circuit.log_scheduler_event(&SchedulerEvent::eval_end(
circuit.nodes.borrow()[id.0].borrow().as_ref(),
));
Ok(progress)
}
fn eval_import_node(&self, id: NodeId) {
let circuit = self.inner();
debug_assert!(id.0 < circuit.nodes.borrow().len());
debug_assert!(circuit.import_nodes().contains(&id));
circuit.nodes.borrow()[id.0].borrow_mut().import();
}
fn flush_node(&self, id: NodeId) {
let circuit = self.inner();
debug_assert!(id.0 < circuit.nodes.borrow().len());
circuit.nodes.borrow()[id.0].borrow_mut().flush();
}
fn is_flush_complete(&self, id: NodeId) -> bool {
let circuit = self.inner();
debug_assert!(id.0 < circuit.nodes.borrow().len());
circuit.nodes.borrow()[id.0].borrow().is_flush_complete()
}
#[track_caller]
fn region<F, V>(&self, name: &str, f: F) -> V
where
F: FnOnce() -> V,
{
self.log_circuit_event(&CircuitEvent::push_region(name, Some(Location::caller())));
let res = f();
self.log_circuit_event(&CircuitEvent::pop_region());
res
}
fn add_preprocessor(&self, preprocessor_node_id: NodeId) {
for node in self.inner().nodes.borrow_mut().iter() {
if node.borrow().is_input() {
self.add_dependency(preprocessor_node_id, node.borrow().local_id());
}
}
}
fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
where
O: Data,
Op: SourceOperator<O>,
{
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = SourceNode::new(operator, self.clone(), id);
let output_stream = node.output_stream();
(node, output_stream)
})
}
fn add_exchange<I, SndOp, O, RcvOp>(
&self,
sender: SndOp,
receiver: RcvOp,
input_stream: &Stream<Self, I>,
) -> Stream<Self, O>
where
I: Data,
O: Data,
SndOp: SinkOperator<I>,
RcvOp: SourceOperator<O>,
{
let preference = sender.input_preference();
self.add_exchange_with_preference(sender, receiver, input_stream, preference)
}
fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
&self,
sender: SndOp,
receiver: RcvOp,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
SndOp: SinkOperator<I>,
RcvOp: SourceOperator<O>,
{
let sender_id = self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
sender.name(),
sender.location(),
));
let node = SinkNode::new(sender, input_stream.clone(), self.clone(), id);
self.connect_stream(input_stream, id, input_preference);
(node, id)
});
let output_stream = self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
receiver.name(),
receiver.location(),
));
let node = SourceNode::new(receiver, self.clone(), id);
let output_stream = node.output_stream();
(node, output_stream)
});
self.add_dependency(sender_id, output_stream.local_node_id());
output_stream
}
fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
where
I: Data,
Op: SinkOperator<I>,
{
let preference = operator.input_preference();
self.add_sink_with_preference(operator, input_stream, preference)
}
fn add_sink_with_preference<I, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> GlobalNodeId
where
I: Data,
Op: SinkOperator<I>,
{
self.add_node(|id| {
let global_node_id = GlobalNodeId::child_of(self, id);
self.log_circuit_event(&CircuitEvent::operator(
global_node_id.clone(),
operator.name(),
operator.location(),
));
self.connect_stream(input_stream, id, input_preference);
(
SinkNode::new(operator, input_stream.clone(), self.clone(), id),
global_node_id,
)
})
}
fn add_binary_sink<I1, I2, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
) where
I1: Data,
I2: Data,
Op: BinarySinkOperator<I1, I2>,
{
let (preference1, preference2) = operator.input_preference();
self.add_binary_sink_with_preference(
operator,
(input_stream1, preference1),
(input_stream2, preference2),
)
}
fn add_binary_sink_with_preference<I1, I2, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
) where
I1: Data,
I2: Data,
Op: BinarySinkOperator<I1, I2>,
{
let (input_stream1, input_preference1) = input_stream1;
let (input_stream2, input_preference2) = input_stream2;
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = BinarySinkNode::new(
operator,
input_stream1.clone(),
input_stream2.clone(),
self.clone(),
id,
);
self.connect_stream(input_stream1, id, input_preference1);
self.connect_stream(input_stream2, id, input_preference2);
(node, ())
});
}
fn add_ternary_sink<I1, I2, I3, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
) -> GlobalNodeId
where
I1: Data,
I2: Data,
I3: Data,
Op: TernarySinkOperator<I1, I2, I3>,
{
let (preference1, preference2, preference3) = operator.input_preference();
self.add_ternary_sink_with_preference(
operator,
(input_stream1, preference1),
(input_stream2, preference2),
(input_stream3, preference3),
)
}
fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
) -> GlobalNodeId
where
I1: Data,
I2: Data,
I3: Data,
Op: TernarySinkOperator<I1, I2, I3>,
{
let (input_stream1, input_preference1) = input_stream1;
let (input_stream2, input_preference2) = input_stream2;
let (input_stream3, input_preference3) = input_stream3;
self.add_node(|id| {
let global_node_id = GlobalNodeId::child_of(self, id);
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = TernarySinkNode::new(
operator,
input_stream1.clone(),
input_stream2.clone(),
input_stream3.clone(),
self.clone(),
id,
);
self.connect_stream(input_stream1, id, input_preference1);
self.connect_stream(input_stream2, id, input_preference2);
self.connect_stream(input_stream3, id, input_preference3);
(node, global_node_id)
})
}
fn add_unary_operator<I, O, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: UnaryOperator<I, O>,
{
let preference = operator.input_preference();
self.add_unary_operator_with_preference(operator, input_stream, preference)
}
fn add_unary_operator_with_preference<I, O, Op>(
&self,
operator: Op,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: UnaryOperator<I, O>,
{
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = UnaryNode::new(operator, input_stream.clone(), self.clone(), id);
let output_stream = node.output_stream();
self.connect_stream(input_stream, id, input_preference);
(node, output_stream)
})
}
fn add_binary_operator<I1, I2, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
O: Data,
Op: BinaryOperator<I1, I2, O>,
{
let (pref1, pref2) = operator.input_preference();
self.add_binary_operator_with_preference(
operator,
(input_stream1, pref1),
(input_stream2, pref2),
)
}
fn add_binary_operator_with_preference<I1, I2, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
O: Data,
Op: BinaryOperator<I1, I2, O>,
{
let (input_stream1, input_preference1) = input_stream1;
let (input_stream2, input_preference2) = input_stream2;
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = BinaryNode::new(
operator,
input_stream1.clone(),
input_stream2.clone(),
self.clone(),
id,
);
let output_stream = node.output_stream();
self.connect_stream(input_stream1, id, input_preference1);
self.connect_stream(input_stream2, id, input_preference2);
(node, output_stream)
})
}
fn add_ternary_operator<I1, I2, I3, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
O: Data,
Op: TernaryOperator<I1, I2, I3, O>,
{
let (pref1, pref2, pref3) = operator.input_preference();
self.add_ternary_operator_with_preference(
operator,
(input_stream1, pref1),
(input_stream2, pref2),
(input_stream3, pref3),
)
}
#[allow(clippy::too_many_arguments)]
fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
O: Data,
Op: TernaryOperator<I1, I2, I3, O>,
{
let (input_stream1, input_preference1) = input_stream1;
let (input_stream2, input_preference2) = input_stream2;
let (input_stream3, input_preference3) = input_stream3;
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = TernaryNode::new(
operator,
input_stream1.clone(),
input_stream2.clone(),
input_stream3.clone(),
self.clone(),
id,
);
let output_stream = node.output_stream();
self.connect_stream(input_stream1, id, input_preference1);
self.connect_stream(input_stream2, id, input_preference2);
self.connect_stream(input_stream3, id, input_preference3);
(node, output_stream)
})
}
fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
&self,
operator: Op,
input_stream1: &Stream<Self, I1>,
input_stream2: &Stream<Self, I2>,
input_stream3: &Stream<Self, I3>,
input_stream4: &Stream<Self, I4>,
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
I4: Data,
O: Data,
Op: QuaternaryOperator<I1, I2, I3, I4, O>,
{
let (pref1, pref2, pref3, pref4) = operator.input_preference();
self.add_quaternary_operator_with_preference(
operator,
(input_stream1, pref1),
(input_stream2, pref2),
(input_stream3, pref3),
(input_stream4, pref4),
)
}
#[allow(clippy::too_many_arguments)]
fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
&self,
operator: Op,
input_stream1: (&Stream<Self, I1>, OwnershipPreference),
input_stream2: (&Stream<Self, I2>, OwnershipPreference),
input_stream3: (&Stream<Self, I3>, OwnershipPreference),
input_stream4: (&Stream<Self, I4>, OwnershipPreference),
) -> Stream<Self, O>
where
I1: Data,
I2: Data,
I3: Data,
I4: Data,
O: Data,
Op: QuaternaryOperator<I1, I2, I3, I4, O>,
{
let (input_stream1, input_preference1) = input_stream1;
let (input_stream2, input_preference2) = input_stream2;
let (input_stream3, input_preference3) = input_stream3;
let (input_stream4, input_preference4) = input_stream4;
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = QuaternaryNode::new(
operator,
input_stream1.clone(),
input_stream2.clone(),
input_stream3.clone(),
input_stream4.clone(),
self.clone(),
id,
);
let output_stream = node.output_stream();
self.connect_stream(input_stream1, id, input_preference1);
self.connect_stream(input_stream2, id, input_preference2);
self.connect_stream(input_stream3, id, input_preference3);
self.connect_stream(input_stream4, id, input_preference4);
(node, output_stream)
})
}
fn add_nary_operator<'a, I, O, Op, Iter>(
&'a self,
operator: Op,
input_streams: Iter,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: NaryOperator<I, O>,
Iter: IntoIterator<Item = &'a Stream<Self, I>>,
{
let pref = operator.input_preference();
self.add_nary_operator_with_preference(operator, input_streams, pref)
}
fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
&'a self,
operator: Op,
input_streams: Iter,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
I: Data,
O: Data,
Op: NaryOperator<I, O>,
Iter: IntoIterator<Item = &'a Stream<Self, I>>,
{
let input_streams: Vec<Stream<_, _>> = input_streams.into_iter().cloned().collect();
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let node = NaryNode::new(operator, input_streams.clone(), self.clone(), id);
let output_stream = node.output_stream();
for stream in input_streams.iter() {
self.connect_stream(stream, id, input_preference);
}
(node, output_stream)
})
}
#[track_caller]
fn add_custom_node<N: Node, R>(
&self,
name: Cow<'static, str>,
constructor: impl FnOnce(NodeId) -> (N, R),
) -> R {
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::operator(
GlobalNodeId::child_of(self, id),
name,
Some(Location::caller()),
));
let (node, res) = constructor(id);
(node, res)
})
}
fn add_feedback<I, O, Op>(
&self,
operator: Op,
) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>,
{
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::strict_operator_output(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let operator = Rc::new(RefCell::new(operator));
let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
let output_node = FeedbackOutputNode::new(operator, self.clone(), id);
let local = output_node.output_stream();
(output_node, (local, connector))
})
}
fn add_feedback_with_export<I, O, Op>(
&self,
operator: Op,
) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>,
{
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::strict_operator_output(
GlobalNodeId::child_of(self, id),
operator.name(),
operator.location(),
));
let operator = Rc::new(RefCell::new(operator));
let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
let output_node = FeedbackOutputNode::with_export(operator, self.clone(), id);
let local = output_node.output_stream();
let export = output_node.export_stream.clone().unwrap();
(output_node, (ExportStream { local, export }, connector))
})
}
fn connect_feedback_with_preference<I, O, Op>(
&self,
output_node_id: NodeId,
operator: Rc<RefCell<Op>>,
input_stream: &Stream<Self, I>,
input_preference: OwnershipPreference,
) where
I: Data,
O: Data,
Op: StrictUnaryOperator<I, O>,
{
self.add_node(|id| {
self.log_circuit_event(&CircuitEvent::strict_operator_input(
GlobalNodeId::child_of(self, id),
output_node_id,
));
let output_node = FeedbackInputNode::new(operator, input_stream.clone(), id);
self.connect_stream(input_stream, id, input_preference);
self.add_dependency(output_node_id, id);
(output_node, ())
})
}
fn iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
E: Executor<IterativeCircuit<Self>>,
{
self.try_add_node(|id| {
let global_id = GlobalNodeId::child_of(self, id);
self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), true));
let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
let (res, executor) = child_constructor(&mut child_circuit)?;
let child = <ChildNode<IterativeCircuit<Self>>>::new::<E>(child_circuit, 1, executor);
self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
Ok((child, res))
})
}
fn non_iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
E: Executor<NonIterativeCircuit<Self>>,
{
self.try_add_node(|id| {
let global_id = GlobalNodeId::child_of(self, id);
self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), false));
let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
let (res, executor) = child_constructor(&mut child_circuit)?;
let child =
<ChildNode<NonIterativeCircuit<Self>>>::new::<E>(child_circuit, 0, executor);
self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
Ok((child, res))
})
}
fn iterate<F, C, V>(&self, constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
{
self.iterate_with_scheduler::<F, C, V, DynamicScheduler>(constructor)
}
fn iterate_with_scheduler<F, C, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
S: Scheduler + 'static,
{
self.iterative_subcircuit(|child| {
let (termination_check, res) = constructor(child)?;
let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
executor.prepare(child, None)?;
Ok((res, executor))
})
}
fn fixedpoint<F, V>(&self, constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
{
self.fixedpoint_with_scheduler::<F, V, DynamicScheduler>(constructor)
}
fn fixedpoint_with_scheduler<F, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
where
F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
S: Scheduler + 'static,
{
self.iterative_subcircuit(|child| {
let res = constructor(child)?;
let child_clone = child.clone();
let consensus = Consensus::new();
let termination_check = async move || {
let local_fixedpoint = child_clone.inner().check_fixedpoint(0);
consensus.check(local_fixedpoint).await
};
let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
executor.prepare(child, None)?;
Ok((res, executor))
})
}
fn import_stream<I, O, Op>(&self, operator: Op, parent_stream: &Stream<P, I>) -> Stream<Self, O>
where
Self::Parent: Circuit,
I: Data,
O: Data,
Op: ImportOperator<I, O>,
{
let preference = operator.input_preference();
self.import_stream_with_preference(operator, parent_stream, preference)
}
fn import_stream_with_preference<I, O, Op>(
&self,
operator: Op,
parent_stream: &Stream<P, I>,
input_preference: OwnershipPreference,
) -> Stream<Self, O>
where
Self::Parent: Circuit,
I: Data,
O: Data,
Op: ImportOperator<I, O>,
{
assert!(self.is_child_of(parent_stream.circuit()));
let output_stream = self.add_node(|id| {
let node_id = self.global_node_id().child(id);
self.log_circuit_event(&CircuitEvent::operator(
node_id.clone(),
operator.name(),
operator.location(),
));
let node = ImportNode::new(operator, self.clone(), parent_stream.clone(), id);
self.parent()
.connect_stream(parent_stream, self.node_id(), input_preference);
self.parent().log_circuit_event(&CircuitEvent::stream(
parent_stream.origin_node_id().clone(),
node_id.clone(),
input_preference,
));
let output_stream = node.output_stream();
(node, output_stream)
});
self.add_import_node(output_stream.local_node_id());
output_stream
}
fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata) {
let mut edges = self.edges_mut();
let mut new_edges = Vec::new();
let Some(edges_to_replay) = edges.get_by_stream_id(&Some(stream_id)) else {
return;
};
for edge in edges_to_replay {
new_edges.push(Edge {
from: replay_stream.local_node_id(),
to: edge.to,
origin: replay_stream.origin_node_id().clone(),
stream: Some(clone_box(replay_stream)),
ownership_preference: edge.ownership_preference,
});
}
edges.extend(new_edges);
}
}
struct ImportNode<C, I, O, Op>
where
C: Circuit,
{
id: GlobalNodeId,
operator: Op,
parent_stream: Stream<C::Parent, I>,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, I, O, Op> ImportNode<C, I, O, Op>
where
C: Circuit,
C::Parent: Circuit,
I: Clone + 'static,
O: Clone + 'static,
Op: ImportOperator<I, O>,
{
fn new(operator: Op, circuit: C, parent_stream: Stream<C::Parent, I>, id: NodeId) -> Self {
assert!(Circuit::ptr_eq(&circuit.parent(), parent_stream.circuit()));
Self {
id: circuit.global_node_id().child(id),
operator,
parent_stream,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I, O, Op> Node for ImportNode<C, I, O, Op>
where
C: Circuit,
C::Parent: Circuit,
I: Clone + 'static,
O: Clone + 'static,
Op: ImportOperator<I, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
self.output_stream.put(self.operator.eval().await);
Ok(self.operator.flush_progress())
})
}
fn import(&mut self) {
match StreamValue::take(self.parent_stream.val()) {
None => self
.operator
.import(StreamValue::peek(&self.parent_stream.get())),
Some(val) => self.operator.import_owned(val),
}
StreamValue::consume_token(self.parent_stream.val());
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct SourceNode<C, O, Op> {
id: GlobalNodeId,
operator: Op,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, O, Op> SourceNode<C, O, Op>
where
Op: SourceOperator<O>,
C: Circuit,
{
fn new(operator: Op, circuit: C, id: NodeId) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, O, Op> Node for SourceNode<C, O, Op>
where
C: Circuit,
O: Clone + 'static,
Op: SourceOperator<O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
self.output_stream.put(self.operator.eval().await);
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct UnaryNode<C, I, O, Op> {
id: GlobalNodeId,
operator: Op,
input_stream: Stream<C, I>,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, I, O, Op> UnaryNode<C, I, O, Op>
where
Op: UnaryOperator<I, O>,
C: Circuit,
{
fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I, O, Op> Node for UnaryNode<C, I, O, Op>
where
C: Circuit,
I: Clone + 'static,
O: Clone + 'static,
Op: UnaryOperator<I, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
self.output_stream
.put(match StreamValue::take(self.input_stream.val()) {
Some(v) => self.operator.eval_owned(v).await,
None => {
self.operator
.eval(StreamValue::peek(&self.input_stream.get()))
.await
}
});
StreamValue::consume_token(self.input_stream.val());
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct SinkNode<C, I, Op> {
id: GlobalNodeId,
operator: Op,
input_stream: Stream<C, I>,
labels: BTreeMap<String, String>,
}
impl<C, I, Op> SinkNode<C, I, Op>
where
Op: SinkOperator<I>,
C: Circuit,
{
fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream,
labels: BTreeMap::new(),
}
}
}
impl<C, I, Op> Node for SinkNode<C, I, Op>
where
C: Circuit,
I: Clone + 'static,
Op: SinkOperator<I>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
match StreamValue::take(self.input_stream.val()) {
Some(v) => self.operator.eval_owned(v).await,
None => {
self.operator
.eval(StreamValue::peek(&self.input_stream.get()))
.await
}
};
StreamValue::consume_token(self.input_stream.val());
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct BinarySinkNode<C, I1, I2, Op> {
id: GlobalNodeId,
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
is_alias: bool,
labels: BTreeMap<String, String>,
}
impl<C, I1, I2, Op> BinarySinkNode<C, I1, I2, Op>
where
I1: Clone,
I2: Clone,
Op: BinarySinkOperator<I1, I2>,
C: Circuit,
{
fn new(
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
circuit: C,
id: NodeId,
) -> Self {
let is_alias = input_stream1.ptr_eq(&input_stream2);
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream1,
input_stream2,
is_alias,
labels: BTreeMap::new(),
}
}
}
impl<C, I1, I2, Op> Node for BinarySinkNode<C, I1, I2, Op>
where
C: Circuit,
I1: Clone + 'static,
I2: Clone + 'static,
Op: BinarySinkOperator<I1, I2>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
if self.is_alias {
{
let val1 = self.input_stream1.get();
let val2 = self.input_stream2.get();
self.operator
.eval(
Cow::Borrowed(StreamValue::peek(&val1)),
Cow::Borrowed(StreamValue::peek(&val2)),
)
.await;
}
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
} else {
let val1 = StreamValue::take(self.input_stream1.val());
let val2 = StreamValue::take(self.input_stream2.val());
match (val1, val2) {
(Some(val1), Some(val2)) => {
self.operator.eval(Cow::Owned(val1), Cow::Owned(val2)).await;
}
(Some(val1), None) => {
self.operator
.eval(
Cow::Owned(val1),
Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
)
.await;
}
(None, Some(val2)) => {
self.operator
.eval(
Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
Cow::Owned(val2),
)
.await;
}
(None, None) => {
self.operator
.eval(
Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
)
.await;
}
}
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
};
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct TernarySinkNode<C, I1, I2, I3, Op> {
id: GlobalNodeId,
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
labels: BTreeMap<String, String>,
}
impl<C, I1, I2, I3, Op> TernarySinkNode<C, I1, I2, I3, Op>
where
I1: Clone,
I2: Clone,
I3: Clone,
Op: TernarySinkOperator<I1, I2, I3>,
C: Circuit,
{
fn new(
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
circuit: C,
id: NodeId,
) -> Self {
assert!(!input_stream1.ptr_eq(&input_stream2));
assert!(!input_stream1.ptr_eq(&input_stream3));
assert!(!input_stream2.ptr_eq(&input_stream3));
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream1,
input_stream2,
input_stream3,
labels: BTreeMap::new(),
}
}
}
impl<C, I1, I2, I3, Op> Node for TernarySinkNode<C, I1, I2, I3, Op>
where
C: Circuit,
I1: Clone + 'static,
I2: Clone + 'static,
I3: Clone + 'static,
Op: TernarySinkOperator<I1, I2, I3>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
let val1 = StreamValue::take(self.input_stream1.val()).map(|val| Cow::Owned(val));
let r1 = self.input_stream1.get();
let val2 = StreamValue::take(self.input_stream2.val()).map(|val| Cow::Owned(val));
let r2 = self.input_stream2.get();
let val3 = StreamValue::take(self.input_stream3.val()).map(|val| Cow::Owned(val));
let r3 = self.input_stream3.get();
self.operator
.eval(
val1.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r1))),
val2.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r2))),
val3.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r3))),
)
.await;
drop(r1);
drop(r2);
drop(r3);
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
StreamValue::consume_token(self.input_stream3.val());
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct BinaryNode<C, I1, I2, O, Op> {
id: GlobalNodeId,
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
output_stream: Stream<C, O>,
is_alias: bool,
labels: BTreeMap<String, String>,
}
impl<C, I1, I2, O, Op> BinaryNode<C, I1, I2, O, Op>
where
Op: BinaryOperator<I1, I2, O>,
C: Circuit,
{
fn new(
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
circuit: C,
id: NodeId,
) -> Self {
let is_alias = input_stream1.ptr_eq(&input_stream2);
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream1,
input_stream2,
is_alias,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I1, I2, O, Op> Node for BinaryNode<C, I1, I2, O, Op>
where
C: Circuit,
I1: Clone + 'static,
I2: Clone + 'static,
O: Clone + 'static,
Op: BinaryOperator<I1, I2, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
if self.is_alias {
{
let val1 = self.input_stream1.get();
let val2 = self.input_stream2.get();
self.output_stream.put(
self.operator
.eval(StreamValue::peek(&val1), StreamValue::peek(&val2))
.await,
);
}
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
} else {
let val1 = StreamValue::take(self.input_stream1.val());
let val2 = StreamValue::take(self.input_stream2.val());
self.output_stream.put(match (val1, val2) {
(Some(val1), Some(val2)) => self.operator.eval_owned(val1, val2).await,
(Some(val1), None) => {
self.operator
.eval_owned_and_ref(val1, StreamValue::peek(&self.input_stream2.get()))
.await
}
(None, Some(val2)) => {
self.operator
.eval_ref_and_owned(StreamValue::peek(&self.input_stream1.get()), val2)
.await
}
(None, None) => {
self.operator
.eval(
StreamValue::peek(&self.input_stream1.get()),
StreamValue::peek(&self.input_stream2.get()),
)
.await
}
});
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
}
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct TernaryNode<C, I1, I2, I3, O, Op> {
id: GlobalNodeId,
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, I1, I2, I3, O, Op> TernaryNode<C, I1, I2, I3, O, Op>
where
I1: Clone,
I2: Clone,
I3: Clone,
Op: TernaryOperator<I1, I2, I3, O>,
C: Circuit,
{
fn new(
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
circuit: C,
id: NodeId,
) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream1,
input_stream2,
input_stream3,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I1, I2, I3, O, Op> Node for TernaryNode<C, I1, I2, I3, O, Op>
where
C: Circuit,
I1: Clone + 'static,
I2: Clone + 'static,
I3: Clone + 'static,
O: Clone + 'static,
Op: TernaryOperator<I1, I2, I3, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
{
self.output_stream.put(
self.operator
.eval(
Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
)
.await,
);
}
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
StreamValue::consume_token(self.input_stream3.val());
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct QuaternaryNode<C, I1, I2, I3, I4, O, Op> {
id: GlobalNodeId,
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
input_stream4: Stream<C, I4>,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, I1, I2, I3, I4, O, Op> QuaternaryNode<C, I1, I2, I3, I4, O, Op>
where
I1: Clone,
I2: Clone,
I3: Clone,
I4: Clone,
Op: QuaternaryOperator<I1, I2, I3, I4, O>,
C: Circuit,
{
fn new(
operator: Op,
input_stream1: Stream<C, I1>,
input_stream2: Stream<C, I2>,
input_stream3: Stream<C, I3>,
input_stream4: Stream<C, I4>,
circuit: C,
id: NodeId,
) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
input_stream1,
input_stream2,
input_stream3,
input_stream4,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I1, I2, I3, I4, O, Op> Node for QuaternaryNode<C, I1, I2, I3, I4, O, Op>
where
C: Circuit,
I1: Clone + 'static,
I2: Clone + 'static,
I3: Clone + 'static,
I4: Clone + 'static,
O: Clone + 'static,
Op: QuaternaryOperator<I1, I2, I3, I4, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
{
self.output_stream.put(
self.operator
.eval(
Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
Cow::Borrowed(StreamValue::peek(&self.input_stream4.get())),
)
.await,
);
}
StreamValue::consume_token(self.input_stream1.val());
StreamValue::consume_token(self.input_stream2.val());
StreamValue::consume_token(self.input_stream3.val());
StreamValue::consume_token(self.input_stream4.val());
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct NaryNode<C, I, O, Op>
where
I: Clone + 'static,
{
id: GlobalNodeId,
operator: Op,
input_streams: Vec<Stream<C, I>>,
output_stream: Stream<C, O>,
labels: BTreeMap<String, String>,
}
impl<C, I, O, Op> NaryNode<C, I, O, Op>
where
I: Clone + 'static,
Op: NaryOperator<I, O>,
C: Circuit,
{
fn new<Iter>(operator: Op, input_streams: Iter, circuit: C, id: NodeId) -> Self
where
Iter: IntoIterator<Item = Stream<C, I>>,
{
let mut input_streams: Vec<_> = input_streams.into_iter().collect();
input_streams.shrink_to_fit();
Self {
id: circuit.global_node_id().child(id),
operator,
input_streams,
output_stream: Stream::new(circuit, id),
labels: BTreeMap::new(),
}
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I, O, Op> Node for NaryNode<C, I, O, Op>
where
C: Circuit,
I: Clone,
O: Clone + 'static,
Op: NaryOperator<I, O>,
{
fn name(&self) -> Cow<'static, str> {
self.operator.name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.is_async()
}
fn is_input(&self) -> bool {
self.operator.is_input()
}
fn ready(&self) -> bool {
self.operator.ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.register_ready_callback(cb);
}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
let refs = self
.input_streams
.iter()
.map(|stream| stream.get())
.collect::<Vec<_>>();
self.output_stream.put(
self.operator
.eval(refs.iter().map(|r| Cow::Borrowed(StreamValue::peek(r))))
.await,
);
std::mem::drop(refs);
for i in self.input_streams.iter() {
StreamValue::consume_token(i.val());
}
Ok(self.operator.flush_progress())
})
}
fn start_transaction(&mut self) {
self.operator.start_transaction();
}
fn flush(&mut self) {
self.operator.flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.clock_start(scope);
}
fn clock_end(&mut self, scope: Scope) {
self.operator.clock_end(scope);
}
fn init(&mut self) {
self.operator.init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.metadata(output);
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct FeedbackOutputNode<C, I, O, Op>
where
C: Circuit,
{
id: GlobalNodeId,
operator: Rc<RefCell<Op>>,
output_stream: Stream<C, O>,
export_stream: Option<Stream<C::Parent, O>>,
phantom_input: PhantomData<I>,
labels: BTreeMap<String, String>,
}
impl<C, I, O, Op> FeedbackOutputNode<C, I, O, Op>
where
C: Circuit,
Op: StrictUnaryOperator<I, O>,
{
fn new(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
Self {
id: circuit.global_node_id().child(id),
operator,
output_stream: Stream::new(circuit.clone(), id),
export_stream: None,
phantom_input: PhantomData,
labels: BTreeMap::new(),
}
}
fn with_export(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
let mut result = Self::new(operator, circuit.clone(), id);
result.export_stream = Some(Stream::with_origin(
circuit.parent(),
circuit.allocate_stream_id(),
circuit.node_id(),
GlobalNodeId::child_of(&circuit, id),
));
result
}
fn output_stream(&self) -> Stream<C, O> {
self.output_stream.clone()
}
}
impl<C, I, O, Op> Node for FeedbackOutputNode<C, I, O, Op>
where
C: Circuit,
I: Data,
O: Clone + 'static,
Op: StrictUnaryOperator<I, O>,
{
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn name(&self) -> Cow<'static, str> {
self.operator.borrow().name()
}
fn is_async(&self) -> bool {
self.operator.borrow().is_async()
}
fn is_input(&self) -> bool {
self.operator.borrow().is_input()
}
fn ready(&self) -> bool {
self.operator.borrow().ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.borrow_mut().register_ready_callback(cb);
}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
self.output_stream
.put(self.operator.borrow_mut().get_output());
Ok(None)
})
}
fn start_transaction(&mut self) {
self.operator.borrow_mut().start_transaction();
}
fn flush(&mut self) {
self.operator.borrow_mut().flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.borrow().is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.operator.borrow_mut().clock_start(scope)
}
fn clock_end(&mut self, scope: Scope) {
if scope == 0
&& let Some(export_stream) = &mut self.export_stream
{
export_stream.put(self.operator.borrow_mut().get_final_output());
}
self.operator.borrow_mut().clock_end(scope);
}
fn init(&mut self) {
self.operator.borrow_mut().init(&self.id);
}
fn metadata(&self, _output: &mut OperatorMeta) {
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.borrow().fixedpoint(scope)
}
fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.operator
.borrow_mut()
.checkpoint(base, self.persistent_id().as_deref(), files)
}
fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
self.operator
.borrow_mut()
.restore(base, self.persistent_id().as_deref())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().clear_state()
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.borrow().is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
struct FeedbackInputNode<C, I, O, Op> {
id: GlobalNodeId,
operator: Rc<RefCell<Op>>,
input_stream: Stream<C, I>,
phantom_output: PhantomData<O>,
labels: BTreeMap<String, String>,
}
impl<C, I, O, Op> FeedbackInputNode<C, I, O, Op>
where
Op: StrictUnaryOperator<I, O>,
C: Circuit,
{
fn new(operator: Rc<RefCell<Op>>, input_stream: Stream<C, I>, id: NodeId) -> Self {
Self {
id: input_stream.circuit().global_node_id().child(id),
operator,
input_stream,
phantom_output: PhantomData,
labels: BTreeMap::new(),
}
}
}
impl<C, I, O, Op> Node for FeedbackInputNode<C, I, O, Op>
where
Op: StrictUnaryOperator<I, O>,
I: Data,
O: 'static,
C: Clone + 'static,
{
fn name(&self) -> Cow<'static, str> {
self.operator.borrow().name()
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_async(&self) -> bool {
self.operator.borrow().is_async()
}
fn is_input(&self) -> bool {
self.operator.borrow().is_input()
}
fn ready(&self) -> bool {
self.operator.borrow().ready()
}
fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
self.operator.borrow_mut().register_ready_callback(cb);
}
#[allow(clippy::await_holding_refcell_ref)]
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
Box::pin(async {
match StreamValue::take(self.input_stream.val()) {
Some(v) => self.operator.borrow_mut().eval_strict_owned(v).await,
None => {
self.operator
.borrow_mut()
.eval_strict(StreamValue::peek(&self.input_stream.get()))
.await
}
};
StreamValue::consume_token(self.input_stream.val());
Ok(None)
})
}
fn start_transaction(&mut self) {
self.operator.borrow_mut().start_transaction();
}
fn flush(&mut self) {
self.operator.borrow_mut().flush();
}
fn is_flush_complete(&self) -> bool {
self.operator.borrow().is_flush_complete()
}
fn clock_start(&mut self, _scope: Scope) {}
fn clock_end(&mut self, _scope: Scope) {}
fn init(&mut self) {
self.operator.borrow_mut().init(&self.id);
}
fn metadata(&self, output: &mut OperatorMeta) {
self.operator.borrow().metadata(output)
}
fn fixedpoint(&self, scope: Scope) -> bool {
self.operator.borrow().fixedpoint(scope)
}
fn checkpoint(
&mut self,
_base: &StoragePath,
_files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
Ok(())
}
fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
Ok(())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
Ok(())
}
fn start_replay(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().start_replay()
}
fn is_replay_complete(&self) -> bool {
self.operator.borrow().is_replay_complete()
}
fn end_replay(&mut self) -> Result<(), DbspError> {
self.operator.borrow_mut().end_replay()
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct FeedbackConnector<C, I, O, Op> {
output_node_id: NodeId,
circuit: C,
operator: Rc<RefCell<Op>>,
phantom_input: PhantomData<I>,
phantom_output: PhantomData<O>,
}
impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
where
Op: StrictUnaryOperator<I, O>,
{
fn new(output_node_id: NodeId, circuit: C, operator: Rc<RefCell<Op>>) -> Self {
Self {
output_node_id,
circuit,
operator,
phantom_input: PhantomData,
phantom_output: PhantomData,
}
}
}
impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
where
Op: StrictUnaryOperator<I, O>,
I: Data,
O: Data,
C: Circuit,
{
pub fn operator_mut(&self) -> RefMut<'_, Op> {
self.operator.borrow_mut()
}
pub fn connect(self, input_stream: &Stream<C, I>) {
self.connect_with_preference(input_stream, OwnershipPreference::INDIFFERENT)
}
pub fn connect_with_preference(
self,
input_stream: &Stream<C, I>,
input_preference: OwnershipPreference,
) {
self.circuit.connect_feedback_with_preference(
self.output_node_id,
self.operator,
input_stream,
input_preference,
)
}
}
struct ChildNode<C>
where
C: Circuit,
{
id: GlobalNodeId,
circuit: C,
executor: Box<dyn Executor<C>>,
labels: BTreeMap<String, String>,
nesting_depth: Scope,
}
impl<C> Drop for ChildNode<C>
where
C: Circuit,
{
fn drop(&mut self) {
self.circuit.clear();
}
}
impl<C> ChildNode<C>
where
C: Circuit,
{
fn new<E>(circuit: C, nesting_depth: Scope, executor: E) -> Self
where
E: Executor<C>,
{
Self {
id: circuit.global_node_id(),
circuit,
executor: Box::new(executor) as Box<dyn Executor<C>>,
labels: BTreeMap::new(),
nesting_depth,
}
}
}
impl<C> Node for ChildNode<C>
where
C: Circuit,
{
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("Subcircuit")
}
fn local_id(&self) -> NodeId {
self.id.local_node_id().unwrap()
}
fn global_id(&self) -> &GlobalNodeId {
&self.id
}
fn is_circuit(&self) -> bool {
true
}
fn is_async(&self) -> bool {
false
}
fn is_input(&self) -> bool {
false
}
fn ready(&self) -> bool {
true
}
fn eval<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
for node_id in self.circuit.import_nodes() {
self.circuit.eval_import_node(node_id)
}
Box::pin(async {
self.executor.transaction(&self.circuit).await?;
Ok(None)
})
}
fn start_transaction(&mut self) {
}
fn flush(&mut self) {
self.executor.flush();
}
fn is_flush_complete(&self) -> bool {
self.executor.is_flush_complete()
}
fn clock_start(&mut self, scope: Scope) {
self.circuit.clock_start(scope + self.nesting_depth);
}
fn clock_end(&mut self, scope: Scope) {
self.circuit.clock_end(scope + self.nesting_depth);
}
fn metadata(&self, _meta: &mut OperatorMeta) {}
fn fixedpoint(&self, scope: Scope) -> bool {
self.circuit.check_fixedpoint(scope + self.nesting_depth)
}
fn map_nodes_recursive(
&self,
f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
) -> Result<(), DbspError> {
self.circuit.map_nodes_recursive(f)
}
fn checkpoint(
&mut self,
_base: &StoragePath,
_files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
Ok(())
}
fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
Ok(())
}
fn clear_state(&mut self) -> Result<(), DbspError> {
self.circuit
.map_local_nodes_mut(&mut |node| node.clear_state())
}
fn start_replay(&mut self) -> Result<(), DbspError> {
Ok(())
}
fn is_replay_complete(&self) -> bool {
true
}
fn end_replay(&mut self) -> Result<(), DbspError> {
Ok(())
}
fn set_label(&mut self, key: &str, value: &str) {
self.labels.insert(key.to_string(), value.to_string());
}
fn get_label(&self, key: &str) -> Option<&str> {
self.labels.get(key).map(|s| s.as_str())
}
fn labels(&self) -> &BTreeMap<String, String> {
&self.labels
}
fn map_child(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
self.circuit.map_node_relative(path, f);
}
fn map_child_mut(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
self.circuit.map_node_mut_relative(path, f);
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_circuit(&self) -> Option<&dyn CircuitBase> {
Some(&self.circuit)
}
}
pub struct CircuitHandle {
circuit: RootCircuit,
executor: Box<dyn Executor<RootCircuit>>,
tokio_runtime: TokioRuntime,
replay_info: Option<BootstrapInfo>,
}
impl Drop for CircuitHandle {
fn drop(&mut self) {
self.circuit
.log_scheduler_event(&SchedulerEvent::clock_end());
if !panicking() {
self.circuit.clock_end(0)
}
self.circuit.clear();
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct BootstrapInfo {
pub replay_sources: BTreeMap<NodeId, StreamId>,
#[allow(dead_code)]
pub need_backfill: BTreeMap<NodeId, Option<String>>,
}
impl CircuitHandle {
pub fn transaction(&self) -> Result<(), DbspError> {
self.tokio_runtime
.block_on(async {
let local_set = LocalSet::new();
local_set
.run_until(async { self.executor.transaction(&self.circuit).await })
.await
})
.map_err(DbspError::Scheduler)
}
pub fn start_transaction(&self) -> Result<(), DbspError> {
self.tokio_runtime
.block_on(async {
let local_set = LocalSet::new();
local_set
.run_until(async { self.executor.start_transaction(&self.circuit).await })
.await
})
.map_err(DbspError::Scheduler)
}
pub fn start_commit_transaction(&self) -> Result<(), DbspError> {
self.executor
.start_commit_transaction()
.map_err(DbspError::Scheduler)
}
pub fn is_commit_complete(&self) -> bool {
self.executor.is_commit_complete()
}
pub fn commit_progress(&self) -> CommitProgress {
self.executor.commit_progress()
}
pub fn step(&self) -> Result<(), DbspError> {
self.tokio_runtime
.block_on(async {
let local_set = LocalSet::new();
local_set
.run_until(async { self.executor.step(&self.circuit).await })
.await
})
.map_err(DbspError::Scheduler)
}
pub fn checkpoint(
&mut self,
base: &StoragePath,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), DbspError> {
self.circuit
.map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
let _span = SamplySpan::new("operator")
.with_category("Checkpoint")
.with_tooltip(|| format!("{} {}", node.name(), node.global_id()));
DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS
.record_callback(|| node.checkpoint(base, files))
})
}
pub fn restore(&mut self, base: &StoragePath) -> Result<Option<BootstrapInfo>, DbspError> {
let mut replay_sources: BTreeMap<NodeId, StreamId> = BTreeMap::new();
let mut need_backfill: BTreeSet<GlobalNodeId> = BTreeSet::new();
self.circuit.map_nodes_recursive_mut(
&mut |node: &mut dyn Node| match node.restore(base) {
Err(e) if Runtime::mode() == Mode::Ephemeral => Err(e),
Err(DbspError::Storage(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
need_backfill.insert(node.global_id().clone());
Ok(())
}
Err(DbspError::IO(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
need_backfill.insert(node.global_id().clone());
Ok(())
}
Err(e) => Err(e),
Ok(()) => Ok(()),
},
)?;
let additional_need_backfill: BTreeSet<GlobalNodeId> =
self.invalidate_balancer_clusters(&need_backfill);
if Runtime::worker_index() == 0 {
debug!(
"CircuitHandle::restore: additional need backfill: {:?}",
additional_need_backfill
);
}
need_backfill.extend(additional_need_backfill);
debug!(
"worker {}: CircuitHandle::restore: found {} operators that require backfill: {:?}",
Runtime::worker_index(),
need_backfill.len(),
need_backfill.iter().cloned().collect::<Vec<GlobalNodeId>>()
);
let need_backfill = need_backfill
.into_iter()
.map(|gid| gid.top_level_ancestor())
.collect::<BTreeSet<_>>();
let mut participate_in_backfill = need_backfill.clone();
let mut participate_in_backfill_new = need_backfill.clone();
while !participate_in_backfill_new.is_empty() {
participate_in_backfill_new = self.compute_replay_nodes_step(
&mut replay_sources,
&need_backfill,
participate_in_backfill_new,
&mut participate_in_backfill,
)?;
}
debug!(
"worker {}: CircuitHandle::restore: replaying {} operators: {:?}\n backfilling {} operators: {:?}\n replay circuit consists of {} operators: {:?}",
Runtime::worker_index(),
replay_sources.len(),
replay_sources.keys().cloned().collect::<Vec<NodeId>>(),
need_backfill.len(),
need_backfill.iter().cloned().collect::<Vec<NodeId>>(),
participate_in_backfill.len(),
participate_in_backfill
.iter()
.cloned()
.collect::<Vec<NodeId>>()
);
assert!(
replay_sources
.keys()
.cloned()
.collect::<BTreeSet<_>>()
.intersection(&need_backfill)
.collect::<Vec<_>>()
.is_empty()
);
let nodes_to_backfill = participate_in_backfill
.difference(&replay_sources.keys().cloned().collect::<BTreeSet<_>>())
.cloned()
.collect::<BTreeSet<_>>();
if !participate_in_backfill.is_empty() {
for node_id in replay_sources.keys() {
self.circuit
.map_local_node_mut(*node_id, &mut |node| node.start_replay())?;
}
for node_id in nodes_to_backfill.iter() {
self.circuit
.map_local_node_mut(*node_id, &mut |node| node.clear_state())?;
}
self.executor
.prepare(&self.circuit, Some(&participate_in_backfill))?;
let need_backfill = nodes_to_backfill
.iter()
.map(|node_id| {
let pid = self.circuit.map_local_node_mut(*node_id, &mut |node| {
node.get_label(LABEL_PERSISTENT_OPERATOR_ID)
.map(|s| s.to_string())
});
(*node_id, pid)
})
.collect::<BTreeMap<_, _>>();
let replay_info = BootstrapInfo {
replay_sources: replay_sources.clone(),
need_backfill,
};
self.replay_info = Some(replay_info.clone());
Ok(Some(replay_info))
} else {
Ok(None)
}
}
fn invalidate_balancer_clusters(
&self,
need_backfill: &BTreeSet<GlobalNodeId>,
) -> BTreeSet<GlobalNodeId> {
let need_backfill_node_ids: BTreeSet<NodeId> = need_backfill
.iter()
.map(|gid| gid.top_level_ancestor())
.collect();
let additional_need_backfill = self
.circuit
.balancer()
.invalidate_clusters_for_bootstrapping(&need_backfill_node_ids);
let nodes_to_add = self.propagate_need_backfill_forward(
additional_need_backfill
.difference(&need_backfill_node_ids)
.cloned()
.collect(),
);
nodes_to_add
.into_iter()
.map(|node_id| GlobalNodeId::root().child(node_id))
.collect()
}
fn propagate_need_backfill_forward(
&self,
mut need_backfill: BTreeSet<NodeId>,
) -> BTreeSet<NodeId> {
let mut worklist: Vec<NodeId> = need_backfill.iter().cloned().collect();
let mut visited = BTreeSet::new();
while let Some(node_id) = worklist.pop() {
if visited.contains(&node_id) {
continue;
}
visited.insert(node_id);
let successors: Vec<NodeId> = self
.circuit
.edges()
.by_source
.get(&node_id)
.into_iter()
.flat_map(|edges| edges.iter().map(|edge| edge.to))
.collect();
for successor in successors {
if !visited.contains(&successor) {
worklist.push(successor);
need_backfill.insert(successor);
}
}
let dependencies: Vec<NodeId> = self
.circuit
.edges()
.by_destination
.get(&node_id)
.into_iter()
.flat_map(|edges| edges.iter())
.filter(|edge| edge.is_dependency())
.map(|edge| edge.from)
.collect();
for dependency in dependencies {
if !visited.contains(&dependency) {
worklist.push(dependency);
need_backfill.insert(dependency);
}
}
}
need_backfill
}
fn compute_replay_nodes_step(
&self,
replay_sources: &mut BTreeMap<NodeId, StreamId>,
need_backfill: &BTreeSet<NodeId>,
participate_in_backfill_new: BTreeSet<NodeId>,
participate_in_backfill: &mut BTreeSet<NodeId>,
) -> Result<BTreeSet<NodeId>, DbspError> {
let mut inputs = BTreeSet::new();
for node_id in participate_in_backfill_new.iter() {
let node_inputs = self
.circuit
.edges()
.by_destination
.get(node_id)
.iter()
.flat_map(|edges| edges.iter())
.filter(|edge| edge.is_stream())
.map(|edge| {
(Some(edge.stream_id().unwrap()), edge.from)
})
.collect::<Vec<_>>();
for input in node_inputs.into_iter() {
inputs.insert(input);
}
for edge in self.circuit.edges().dependencies_of(*node_id) {
inputs.insert((None, edge.from));
}
for edge in self.circuit.edges().depend_on(*node_id) {
inputs.insert((None, edge.to));
}
}
let mut participate_in_backfill_new = BTreeSet::new();
let mut replay_streams = BTreeMap::new();
for (stream_id, mut node_id) in inputs.into_iter() {
if let Some(stream_id) = stream_id
&& let Some(replay_source) = self.circuit.get_replay_source(stream_id)
{
if !need_backfill.contains(&replay_source.local_node_id()) {
replay_streams.insert(stream_id, replay_source.clone());
node_id = replay_source.local_node_id();
}
}
if !participate_in_backfill.contains(&node_id) {
participate_in_backfill.insert(node_id);
participate_in_backfill_new.insert(node_id);
}
}
for (original_stream, replay_stream) in replay_streams.into_iter() {
replay_sources
.entry(replay_stream.local_node_id())
.or_insert_with(|| {
self.circuit
.add_replay_edges(original_stream, replay_stream.as_ref());
replay_stream.stream_id()
});
}
Ok(participate_in_backfill_new)
}
pub fn is_replay_complete(&self) -> bool {
let Some(replay_info) = self.replay_info.as_ref() else {
return true;
};
replay_info.replay_sources.keys().all(|node_id| {
self.circuit
.map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
})
}
pub fn complete_replay(&mut self) -> Result<(), DbspError> {
let Some(replay_info) = self.replay_info.take() else {
return Ok(());
};
for (node_id, stream_id) in replay_info.replay_sources.iter() {
self.circuit
.map_local_node_mut(*node_id, &mut |node| node.end_replay())?;
self.circuit.edges_mut().delete_stream(*stream_id);
}
self.executor.prepare(&self.circuit, None)?;
Ok(())
}
pub fn fingerprint(&self) -> u64 {
let mut fip = Fingerprinter::default();
let _ = self.circuit.map_nodes_recursive(&mut |node: &dyn Node| {
node.fingerprint(&mut fip);
Ok(())
});
fip.finish()
}
pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
where
F: FnMut(&SchedulerEvent<'_>) + 'static,
{
self.circuit.register_scheduler_event_handler(name, handler);
}
pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
self.circuit.unregister_scheduler_event_handler(name)
}
pub fn lir(&self) -> LirCircuit {
(&self.circuit as &dyn CircuitBase).to_lir()
}
pub fn set_balancer_hint(
&self,
global_node_id: &GlobalNodeId,
hint: BalancerHint,
) -> Result<(), DbspError> {
self.circuit.set_balancer_hint(global_node_id, hint)
}
pub fn get_current_balancer_policy(&self) -> BTreeMap<GlobalNodeId, PartitioningPolicy> {
self.circuit
.get_current_balancer_policy()
.into_iter()
.map(|(node_id, policy)| (GlobalNodeId::root().child(node_id), policy))
.collect()
}
pub fn rebalance(&self) {
self.circuit.rebalance()
}
}
#[cfg(test)]
mod tests {
use crate::{
Circuit, Error as DbspError, RootCircuit,
circuit::schedule::{DynamicScheduler, Scheduler},
monitor::TraceMonitor,
operator::{Generator, Z1},
};
use anyhow::anyhow;
use std::{cell::RefCell, ops::Deref, rc::Rc, vec::Vec};
#[test]
fn sum_circuit_dynamic() {
sum_circuit::<DynamicScheduler>();
}
fn sum_circuit<S>()
where
S: Scheduler + 'static,
{
let actual_output: Rc<RefCell<Vec<isize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
let actual_output_clone = actual_output.clone();
let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
let mut n: isize = 0;
let source = circuit.add_source(Generator::new(move || {
let result = n;
n += 1;
result
}));
let integrator = source.integrate();
integrator.inspect(|n| println!("{}", n));
integrator.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
Ok(())
})
.unwrap()
.0;
for _ in 0..100 {
circuit.transaction().unwrap();
}
let mut sum = 0;
let mut expected_output: Vec<isize> = Vec::with_capacity(100);
for i in 0..100 {
sum += i;
expected_output.push(sum);
}
assert_eq!(&expected_output, actual_output.borrow().deref());
}
#[test]
fn recursive_sum_circuit_dynamic() {
recursive_sum_circuit::<DynamicScheduler>()
}
fn recursive_sum_circuit<S>()
where
S: Scheduler + 'static,
{
let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
let actual_output_clone = actual_output.clone();
let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
let mut n: usize = 0;
let source = circuit.add_source(Generator::new(move || {
let result = n;
n += 1;
result
}));
let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
let plus = source
.apply2(&z1_output, |n1: &usize, n2: &usize| *n1 + *n2)
.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
z1_feedback.connect(&plus);
Ok(())
})
.unwrap()
.0;
for _ in 0..100 {
circuit.transaction().unwrap();
}
let mut sum = 0;
let mut expected_output: Vec<usize> = Vec::with_capacity(100);
for i in 0..100 {
sum += i;
expected_output.push(sum);
}
assert_eq!(&expected_output, actual_output.borrow().deref());
}
#[test]
fn factorial_dynamic() {
factorial::<DynamicScheduler>();
}
fn factorial<S>()
where
S: Scheduler + 'static,
{
let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
let actual_output_clone = actual_output.clone();
let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
let mut n: usize = 0;
let source = circuit.add_source(Generator::new(move || {
n += 1;
n
}));
let fact = circuit
.iterate_with_condition_and_scheduler::<_, _, S>(|child| {
let mut counter = 0;
let countdown = source.delta0(child).apply_mut(move |parent_val| {
if *parent_val > 0 {
counter = *parent_val;
};
let res = counter;
counter -= 1;
res
});
let (z1_output, z1_feedback) = child.add_feedback_with_export(Z1::new(1));
let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
z1_feedback.connect(&mul);
Ok((countdown.condition(|n| *n <= 1), z1_output.export))
})
.unwrap();
fact.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
Ok(())
})
.unwrap()
.0;
for _ in 1..10 {
circuit.transaction().unwrap();
}
let mut expected_output: Vec<usize> = Vec::with_capacity(10);
for i in 1..10 {
expected_output.push(my_factorial(i));
}
assert_eq!(&expected_output, actual_output.borrow().deref());
}
fn my_factorial(n: usize) -> usize {
if n == 1 { 1 } else { n * my_factorial(n - 1) }
}
#[test]
fn init_circuit_constructor_error() {
match RootCircuit::build(|_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
_ => panic!(),
}
}
}