1use crate::{
26 Error as DbspError, Position, Runtime, RuntimeError,
27 circuit::{
28 cache::{CircuitCache, CircuitStoreMarker},
29 fingerprinter::Fingerprinter,
30 metadata::OperatorMeta,
31 metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
32 operator_traits::{
33 BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
34 QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
35 TernarySinkOperator, UnaryOperator,
36 },
37 runtime::Consensus,
38 schedule::{
39 CommitProgress, DynamicScheduler, Error as SchedulerError, Executor, IterativeExecutor,
40 OnceExecutor, Scheduler,
41 },
42 trace::{CircuitEvent, SchedulerEvent},
43 },
44 circuit_cache_key,
45 ir::LABEL_MIR_NODE_ID,
46 operator::dynamic::balance::{Balancer, BalancerError, BalancerHint, PartitioningPolicy},
47 time::{Timestamp, UnitTimestamp},
48};
49#[cfg(doc)]
50use crate::{
51 InputHandle, OutputHandle,
52 algebra::{IndexedZSet, ZSet},
53 operator::{Aggregator, Fold, Generator, Max, Min, time_series::RelRange},
54 trace::Batch,
55};
56use anyhow::Error as AnyError;
57use dyn_clone::{DynClone, clone_box};
58use feldera_ir::{LirCircuit, LirNodeId};
59use feldera_samply::Span;
60use feldera_storage::{FileCommitter, StoragePath};
61use itertools::Itertools;
62use pin_project_lite::pin_project;
63use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
64use std::{
65 any::{Any, TypeId, type_name_of_val},
66 borrow::Cow,
67 cell::{Cell, Ref, RefCell, RefMut},
68 collections::{BTreeMap, BTreeSet, HashMap},
69 fmt::{self, Debug, Display, Write},
70 future::Future,
71 io::ErrorKind,
72 marker::PhantomData,
73 mem::{take, transmute},
74 ops::Deref,
75 panic::Location,
76 pin::Pin,
77 rc::Rc,
78 sync::Arc,
79 task::{Context, Poll},
80 thread::panicking,
81 time::{Duration, Instant},
82};
83use tokio::{runtime::Runtime as TokioRuntime, task::LocalSet};
84use tracing::debug;
85use typedmap::{TypedMap, TypedMapKey};
86
87use super::dbsp_handle::Mode;
88
89const LABEL_PERSISTENT_OPERATOR_ID: &str = "persistent_id";
92
93struct StreamValue<D> {
95 val: Option<D>,
98
99 consumers: usize,
104
105 tokens: Cell<usize>,
112}
113
114impl<D> StreamValue<D> {
115 const fn empty() -> Self {
116 Self {
117 val: None,
118 consumers: 0,
119 tokens: Cell::new(0),
120 }
121 }
122
123 fn put(&mut self, val: D) {
124 debug_assert!(self.val.is_none());
128
129 if self.consumers > 0 {
132 self.tokens = Cell::new(self.consumers);
133 self.val = Some(val);
134 }
135 }
136
137 fn peek<R>(this: &R) -> &D
139 where
140 R: Deref<Target = Self>,
141 {
142 debug_assert_ne!(this.tokens.get(), 0);
143
144 this.val.as_ref().unwrap()
145 }
146
147 fn take(this: &RefCell<Self>) -> Option<D>
150 where
151 D: Clone,
152 {
153 let tokens = this.borrow().tokens.get();
154 debug_assert_ne!(tokens, 0);
155
156 if tokens == 1 {
157 Some(this.borrow_mut().val.take().unwrap())
158 } else {
159 None
160 }
161 }
162
163 fn consume_token(this: &RefCell<Self>) {
169 let this_ref = this.borrow();
170 debug_assert_ne!(this_ref.tokens.get(), 0);
171 this_ref.tokens.update(|tokens| tokens - 1);
172 if this_ref.tokens.get() == 0 {
173 drop(this_ref);
175 this.borrow_mut().val.take();
176 }
177 }
178}
179
180#[repr(transparent)]
181pub struct RefStreamValue<D>(Rc<RefCell<StreamValue<D>>>);
182
183impl<D> Clone for RefStreamValue<D> {
184 fn clone(&self) -> Self {
185 Self(self.0.clone())
186 }
187}
188
189impl<D> RefStreamValue<D> {
190 pub fn empty() -> Self {
191 Self(Rc::new(RefCell::new(StreamValue::empty())))
192 }
193
194 fn get_mut(&self) -> RefMut<'_, StreamValue<D>> {
195 self.0.borrow_mut()
196 }
197
198 fn get(&self) -> Ref<'_, StreamValue<D>> {
199 self.0.borrow()
200 }
201
202 pub fn put(&self, d: D) {
208 let mut val = self.get_mut();
209 val.put(d);
210 }
211
212 unsafe fn transmute<D2>(&self) -> RefStreamValue<D2> {
213 unsafe {
214 RefStreamValue(std::mem::transmute::<
215 Rc<RefCell<StreamValue<D>>>,
216 Rc<RefCell<StreamValue<D2>>>,
217 >(self.0.clone()))
218 }
219 }
220}
221
222pub trait StreamMetadata: DynClone + 'static {
227 fn stream_id(&self) -> StreamId;
228 fn local_node_id(&self) -> NodeId;
229 fn origin_node_id(&self) -> &GlobalNodeId;
230 fn num_consumers(&self) -> usize;
231
232 fn clear_consumer_count(&self);
234
235 fn register_consumer(&self);
238
239 fn consume_token(&self);
248}
249
250dyn_clone::clone_trait_object!(StreamMetadata);
251
252pub struct Stream<C, D> {
701 stream_id: StreamId,
703 local_node_id: NodeId,
705 origin_node_id: GlobalNodeId,
707 circuit: C,
709 val: RefStreamValue<D>,
712}
713
714impl<C, D> StreamMetadata for Stream<C, D>
715where
716 C: Clone + 'static,
717 D: 'static,
718{
719 fn stream_id(&self) -> StreamId {
720 self.stream_id
721 }
722 fn local_node_id(&self) -> NodeId {
723 self.local_node_id
724 }
725 fn origin_node_id(&self) -> &GlobalNodeId {
726 &self.origin_node_id
727 }
728 fn clear_consumer_count(&self) {
729 self.val.get_mut().consumers = 0;
730 }
731 fn num_consumers(&self) -> usize {
732 self.val.get().consumers
733 }
734 fn register_consumer(&self) {
735 self.val.get_mut().consumers += 1;
736 }
737 fn consume_token(&self) {
738 StreamValue::consume_token(self.val());
739 }
740}
741
742impl<C, D> Clone for Stream<C, D>
743where
744 C: Clone,
745{
746 fn clone(&self) -> Self {
747 Self {
748 stream_id: self.stream_id,
749 local_node_id: self.local_node_id,
750 origin_node_id: self.origin_node_id.clone(),
751 circuit: self.circuit.clone(),
752 val: self.val.clone(),
753 }
754 }
755}
756
757impl<C, D> Stream<C, D>
758where
759 C: Clone,
760{
761 pub(crate) unsafe fn transmute_payload<D2>(&self) -> Stream<C, D2> {
770 unsafe {
771 Stream {
772 stream_id: self.stream_id,
773 local_node_id: self.local_node_id,
774 origin_node_id: self.origin_node_id.clone(),
775 circuit: self.circuit.clone(),
776 val: self.val.transmute::<D2>(),
777 }
778 }
779 }
780}
781
782impl<C, D> Stream<C, D> {
783 pub fn local_node_id(&self) -> NodeId {
789 self.local_node_id
790 }
791
792 pub fn origin_node_id(&self) -> &GlobalNodeId {
798 &self.origin_node_id
799 }
800
801 pub fn stream_id(&self) -> StreamId {
802 self.stream_id
803 }
804
805 pub fn circuit(&self) -> &C {
807 &self.circuit
808 }
809
810 pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool {
811 self.stream_id() == other.stream_id()
812 }
813}
814
815impl<C, D> Stream<C, D>
817where
818 C: Circuit,
819{
820 pub(crate) fn new(circuit: C, node_id: NodeId) -> Self {
823 Self {
824 stream_id: circuit.allocate_stream_id(),
825 local_node_id: node_id,
826 origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
827 circuit,
828 val: RefStreamValue::empty(),
829 }
830 }
831
832 pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self {
834 Self {
835 stream_id: circuit.allocate_stream_id(),
836 local_node_id: node_id,
837 origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
838 circuit,
839 val,
840 }
841 }
842
843 pub fn value(&self) -> RefStreamValue<D> {
844 self.val.clone()
845 }
846
847 pub fn export(&self) -> Stream<C::Parent, D>
855 where
856 C::Parent: Circuit,
857 D: 'static,
858 {
859 self.circuit()
860 .cache_get_or_insert_with(ExportId::new(self.stream_id()), || unimplemented!())
861 .clone()
862 }
863
864 pub fn set_label(&self, key: &str, val: &str) -> Self {
866 self.circuit.set_node_label(&self.origin_node_id, key, val);
867 self.clone()
868 }
869
870 pub fn get_label(&self, key: &str) -> Option<String> {
872 self.circuit.get_node_label(&self.origin_node_id, key)
873 }
874
875 pub fn set_persistent_id(&self, name: Option<&str>) -> Self {
877 if let Some(name) = name {
878 self.set_label(LABEL_PERSISTENT_OPERATOR_ID, name)
879 } else {
880 self.clone()
881 }
882 }
883
884 pub fn get_persistent_id(&self) -> Option<String> {
886 self.get_label(LABEL_PERSISTENT_OPERATOR_ID)
887 }
888}
889
890impl<C, D> Stream<C, D> {
891 fn with_origin(
893 circuit: C,
894 stream_id: StreamId,
895 node_id: NodeId,
896 origin_node_id: GlobalNodeId,
897 ) -> Self {
898 Self {
899 stream_id,
900 local_node_id: node_id,
901 origin_node_id,
902 circuit,
903 val: RefStreamValue::empty(),
904 }
905 }
906}
907
908impl<C, D> Stream<C, D> {
909 pub(crate) fn map_value<T>(&self, f: impl Fn(&D) -> T) -> T {
910 f(StreamValue::peek(&self.get()))
911 }
912
913 fn get(&self) -> Ref<'_, StreamValue<D>> {
914 self.val.get()
915 }
916
917 fn val(&self) -> &RefCell<StreamValue<D>> {
918 &self.val.0
919 }
920
921 pub(crate) fn put(&self, d: D) {
928 self.val.put(d);
929 }
930}
931
932pub struct ExportStream<C, D>
941where
942 C: Circuit,
943{
944 pub local: Stream<C, D>,
945 pub export: Stream<C::Parent, D>,
946}
947
948pub type Scope = u16;
954
955pub trait Node: Any {
958 fn local_id(&self) -> NodeId;
960
961 fn global_id(&self) -> &GlobalNodeId;
963
964 fn persistent_id(&self) -> Option<String> {
977 let worker_index = Runtime::worker_index();
978
979 match Runtime::mode() {
980 Mode::Ephemeral => Some(format!(
981 "{worker_index}-{}",
982 self.global_id().path_as_string()
983 )),
984 Mode::Persistent => self
985 .get_label(LABEL_PERSISTENT_OPERATOR_ID)
986 .map(|operator_id| format!("{worker_index}-{operator_id}")),
987 }
988 }
989
990 fn name(&self) -> Cow<'static, str>;
992
993 fn is_circuit(&self) -> bool {
994 false
995 }
996
997 fn is_input(&self) -> bool;
999
1000 fn is_async(&self) -> bool;
1004
1005 fn ready(&self) -> bool;
1009
1010 fn register_ready_callback(&mut self, _cb: Box<dyn Fn() + Send + Sync>) {}
1014
1015 fn eval<'a>(
1019 &'a mut self,
1020 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>>;
1021
1022 fn import<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
1023 Box::pin(async {})
1024 }
1025
1026 fn start_transaction(&mut self);
1028
1029 fn flush(&mut self);
1032
1033 fn is_flush_complete(&self) -> bool;
1035
1036 fn clock_start(&mut self, scope: Scope);
1046
1047 fn clock_end(&mut self, scope: Scope);
1055
1056 fn init(&mut self) {}
1057
1058 fn metadata(&self, output: &mut OperatorMeta);
1059
1060 fn fixedpoint(&self, scope: Scope) -> bool;
1061
1062 fn map_nodes_recursive(
1065 &self,
1066 _f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1067 ) -> Result<(), DbspError> {
1068 Ok(())
1069 }
1070
1071 fn map_nodes_recursive_mut(
1074 &self,
1075 _f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1076 ) -> Result<(), DbspError> {
1077 Ok(())
1078 }
1079
1080 fn checkpoint(
1086 &mut self,
1087 base: &StoragePath,
1088 files: &mut Vec<Arc<dyn FileCommitter>>,
1089 ) -> Result<(), DbspError>;
1090
1091 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError>;
1094
1095 fn clear_state(&mut self) -> Result<(), DbspError>;
1101
1102 fn start_compaction(&mut self);
1104
1105 fn start_replay(&mut self) -> Result<(), DbspError>;
1114
1115 fn is_replay_complete(&self) -> bool;
1121
1122 fn end_replay(&mut self) -> Result<(), DbspError>;
1130
1131 fn fingerprint(&self, fip: &mut Fingerprinter) {
1133 fip.hash(type_name_of_val(self));
1134 }
1135
1136 fn set_label(&mut self, key: &str, value: &str);
1138
1139 fn get_label(&self, key: &str) -> Option<&str>;
1141
1142 fn labels(&self) -> &BTreeMap<String, String>;
1143
1144 fn map_child(&self, _path: &[NodeId], _f: &mut dyn FnMut(&dyn Node)) {
1146 panic!("map_child: not a circuit node")
1147 }
1148
1149 fn map_child_mut(&self, _path: &[NodeId], _f: &mut dyn FnMut(&mut dyn Node)) {
1151 panic!("map_child_mut: not a circuit node")
1152 }
1153
1154 fn as_circuit(&self) -> Option<&dyn CircuitBase> {
1155 None
1156 }
1157
1158 fn as_any(&self) -> &dyn Any;
1159}
1160
1161#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
1163#[repr(transparent)]
1164pub struct StreamId(usize);
1165
1166impl StreamId {
1167 pub fn new(id: usize) -> Self {
1168 Self(id)
1169 }
1170
1171 pub fn id(&self) -> usize {
1173 self.0
1174 }
1175}
1176
1177impl Display for StreamId {
1178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1179 f.write_char('s')?;
1180 Debug::fmt(&self.0, f)
1181 }
1182}
1183
1184#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1186#[repr(transparent)]
1187pub struct NodeId(usize);
1188
1189impl NodeId {
1190 pub fn new(id: usize) -> Self {
1191 Self(id)
1192 }
1193
1194 pub fn id(&self) -> usize {
1196 self.0
1197 }
1198
1199 pub(super) fn root() -> Self {
1200 Self(0)
1201 }
1202}
1203
1204impl Display for NodeId {
1205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1206 f.write_char('n')?;
1207 Debug::fmt(&self.0, f)
1208 }
1209}
1210
1211#[derive(Debug, Clone, Eq, Hash, PartialEq)]
1213pub struct RegionName {
1214 id: u64,
1216 pub name: Cow<'static, str>,
1217}
1218
1219impl RegionName {
1220 fn new(name: &str, id: u64) -> Self {
1221 Self {
1222 id,
1223 name: Cow::Owned(name.to_string()),
1224 }
1225 }
1226
1227 pub fn id(&self) -> u64 {
1229 self.id
1230 }
1231
1232 pub fn name(&self) -> &str {
1234 &self.name
1235 }
1236}
1237
1238impl Display for RegionName {
1239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1240 f.write_str(self.name.as_ref())?;
1241 f.write_char(':')?;
1242 write!(f, "{}", self.id)
1243 }
1244}
1245
1246#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1254#[repr(transparent)]
1255pub struct GlobalNodeId(Vec<NodeId>);
1256
1257impl Serialize for GlobalNodeId {
1258 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1260 where
1261 S: Serializer,
1262 {
1263 let s = self.node_identifier().to_string();
1264 serializer.serialize_str(&s)
1265 }
1266}
1267
1268impl Display for GlobalNodeId {
1269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1270 f.write_str("[")?;
1271 let path = self.path();
1272 for i in 0..path.len() {
1273 f.write_str(&path[i].0.to_string())?;
1274 if i < path.len() - 1 {
1275 f.write_str(".")?;
1276 }
1277 }
1278 f.write_str("]")
1279 }
1280}
1281
1282impl GlobalNodeId {
1283 pub fn from_path(path: &[NodeId]) -> Self {
1285 Self(path.to_owned())
1286 }
1287
1288 pub fn from_path_vec(path: Vec<NodeId>) -> Self {
1290 Self(path)
1291 }
1292
1293 pub fn root() -> Self {
1294 Self(Vec::new())
1295 }
1296
1297 pub fn child(&self, child_id: NodeId) -> Self {
1299 let mut path = Vec::with_capacity(self.path().len() + 1);
1300 for id in self.path() {
1301 path.push(*id);
1302 }
1303 path.push(child_id);
1304 Self(path)
1305 }
1306
1307 pub fn child_of<C>(circuit: &C, node_id: NodeId) -> Self
1309 where
1310 C: Circuit,
1311 {
1312 let mut ids = circuit.global_node_id().path().to_owned();
1313 ids.push(node_id);
1314 Self(ids)
1315 }
1316
1317 pub fn node_identifier(&self) -> impl Display {
1319 struct NodeIdentifier<'a>(&'a GlobalNodeId);
1320 impl<'a> Display for NodeIdentifier<'a> {
1321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1322 write!(f, "n{}", self.0.path().iter().format("_"))
1323 }
1324 }
1325 NodeIdentifier(self)
1326 }
1327
1328 pub fn local_node_id(&self) -> Option<NodeId> {
1330 self.0.last().cloned()
1331 }
1332
1333 pub fn parent_id(&self) -> Option<Self> {
1335 self.0
1336 .split_last()
1337 .map(|(_, prefix)| GlobalNodeId::from_path(prefix))
1338 }
1339
1340 pub fn is_child_of(&self, parent: &Self) -> bool {
1342 self.parent_id().as_ref() == Some(parent)
1343 }
1344
1345 pub fn path(&self) -> &[NodeId] {
1347 &self.0
1348 }
1349
1350 pub fn top_level_ancestor(&self) -> NodeId {
1359 self.0[0]
1360 }
1361
1362 pub(crate) fn path_as_string(&self) -> String {
1364 self.0
1365 .iter()
1366 .map(|node_id| node_id.0.to_string())
1367 .collect::<Vec<_>>()
1368 .join("-")
1369 }
1370
1371 pub fn lir_node_id(&self) -> LirNodeId {
1373 LirNodeId::new(&self.path_as_string())
1374 }
1375}
1376
1377type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
1378type SchedulerEventHandler = Box<dyn FnMut(&SchedulerEvent<'_>)>;
1379type CircuitEventHandlers = Rc<RefCell<HashMap<String, CircuitEventHandler>>>;
1380type SchedulerEventHandlers = Rc<RefCell<HashMap<String, SchedulerEventHandler>>>;
1381
1382#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
1421#[repr(transparent)]
1422pub struct OwnershipPreference(usize);
1423
1424impl OwnershipPreference {
1425 pub const fn new(val: usize) -> Self {
1428 Self(val)
1429 }
1430
1431 pub const INDIFFERENT: Self = Self::new(0);
1433
1434 pub const WEAKLY_PREFER_OWNED: Self = Self::new(40);
1440
1441 pub const PREFER_OWNED: Self = Self::new(50);
1447
1448 pub const STRONGLY_PREFER_OWNED: Self = Self::new(100);
1451
1452 pub const fn raw(&self) -> usize {
1454 self.0
1455 }
1456}
1457
1458impl Default for OwnershipPreference {
1459 #[inline]
1460 fn default() -> Self {
1461 Self::INDIFFERENT
1462 }
1463}
1464
1465impl Display for OwnershipPreference {
1466 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1467 match *self {
1468 Self::INDIFFERENT => f.write_str("Indifferent"),
1469 Self::WEAKLY_PREFER_OWNED => f.write_str("WeaklyPreferOwned"),
1470 Self::PREFER_OWNED => f.write_str("PreferOwned"),
1471 Self::STRONGLY_PREFER_OWNED => f.write_str("StronglyPreferOwned"),
1472 Self(preference) => write!(f, "Preference({preference})"),
1473 }
1474 }
1475}
1476
1477#[derive(Clone)]
1482pub struct Edge {
1483 pub from: NodeId,
1485 pub to: NodeId,
1487 pub origin: GlobalNodeId,
1491 pub stream: Option<Box<dyn StreamMetadata>>,
1494 pub ownership_preference: Option<OwnershipPreference>,
1497}
1498
1499#[allow(dead_code)]
1500impl Edge {
1501 pub(crate) fn is_dependency(&self) -> bool {
1503 self.ownership_preference.is_none()
1504 }
1505
1506 pub(crate) fn is_stream(&self) -> bool {
1508 self.stream.is_some()
1509 }
1510
1511 pub(crate) fn stream_id(&self) -> Option<StreamId> {
1512 self.stream.as_ref().map(|meta| meta.stream_id())
1513 }
1514}
1515
1516circuit_cache_key!(ExportId<C, D>(StreamId => Stream<C, D>));
1517
1518circuit_cache_key!(ReplaySource(StreamId => Box<dyn StreamMetadata>));
1521
1522pub(crate) fn register_replay_stream<C, B>(
1524 circuit: &C,
1525 stream: &Stream<C, B>,
1526 replay_stream: &Stream<C, B>,
1527) where
1528 C: Circuit,
1529 B: 'static,
1530{
1531 if TypeId::of::<()>() == TypeId::of::<C::Time>() {
1534 if !circuit.cache_contains(&ReplaySource::new(stream.stream_id())) {
1543 circuit.cache_insert(
1544 ReplaySource::new(stream.stream_id()),
1545 Box::new(replay_stream.clone()),
1546 );
1547 }
1548 }
1549}
1550
1551pub trait WithClock {
1554 type Time: Timestamp;
1557
1558 fn time(&self) -> Self::Time;
1560}
1561
1562impl WithClock for () {
1566 type Time = UnitTimestamp;
1567
1568 fn time(&self) -> Self::Time {
1569 UnitTimestamp
1570 }
1571}
1572
1573impl<P, T> WithClock for ChildCircuit<P, T>
1574where
1575 P: 'static,
1576 T: Timestamp,
1577{
1578 type Time = T;
1579
1580 fn time(&self) -> Self::Time {
1581 self.time.borrow().clone()
1582 }
1583}
1584
1585#[derive(Default, Debug, Clone, Serialize, Deserialize)]
1587pub struct CircuitMetadata {
1588 metadata: HashMap<NodeId, serde_json::Value>,
1589}
1590
1591#[derive(Default, Debug)]
1592pub struct MetadataExchangeInner {
1593 local_metadata: RefCell<CircuitMetadata>,
1595
1596 global_metadata: RefCell<Vec<CircuitMetadata>>,
1599}
1600
1601#[derive(Default, Debug, Clone)]
1611pub struct MetadataExchange {
1612 inner: Rc<MetadataExchangeInner>,
1613}
1614
1615impl MetadataExchange {
1616 fn new() -> Self {
1617 Self::default()
1618 }
1619
1620 pub fn local_metadata(&self) -> CircuitMetadata {
1622 self.inner.local_metadata.borrow().clone()
1623 }
1624
1625 pub fn set_local_operator_metadata(&self, id: NodeId, metadata: serde_json::Value) {
1627 self.inner
1628 .local_metadata
1629 .borrow_mut()
1630 .metadata
1631 .insert(id, metadata.clone());
1632 }
1633
1634 pub fn clear_local_operator_metadata(&self, id: NodeId) {
1636 self.inner.local_metadata.borrow_mut().metadata.remove(&id);
1637 }
1638
1639 pub fn set_local_operator_metadata_typed<T>(&self, id: NodeId, metadata: T)
1641 where
1642 T: Serialize,
1643 {
1644 self.inner
1645 .local_metadata
1646 .borrow_mut()
1647 .metadata
1648 .insert(id, serde_json::to_value(metadata).unwrap());
1649 }
1650
1651 pub fn get_local_operator_metadata(&self, id: NodeId) -> Option<serde_json::Value> {
1653 self.inner
1654 .local_metadata
1655 .borrow()
1656 .metadata
1657 .get(&id)
1658 .cloned()
1659 }
1660
1661 pub fn get_local_operator_metadata_typed<T>(&self, id: NodeId) -> Option<T>
1662 where
1663 T: DeserializeOwned,
1664 {
1665 self.get_local_operator_metadata(id)
1666 .map(|val| serde_json::from_value::<T>(val).unwrap())
1667 }
1668
1669 pub fn set_global_metadata(&self, global_metadata: Vec<CircuitMetadata>) {
1671 *self.inner.global_metadata.borrow_mut() = global_metadata;
1672 }
1673
1674 pub fn get_global_metadata(&self) -> Vec<CircuitMetadata> {
1675 self.inner.global_metadata.borrow().clone()
1676 }
1677
1678 pub fn get_global_operator_metadata(&self, id: NodeId) -> Vec<Option<serde_json::Value>> {
1680 self.inner
1681 .global_metadata
1682 .borrow()
1683 .iter()
1684 .map(|global_metadata| global_metadata.metadata.get(&id).cloned())
1685 .collect()
1686 }
1687
1688 pub fn get_global_operator_metadata_typed<T>(&self, id: NodeId) -> Vec<Option<T>>
1695 where
1696 T: DeserializeOwned,
1697 {
1698 self.inner
1699 .global_metadata
1700 .borrow()
1701 .iter()
1702 .map(|global_metadata| {
1703 global_metadata
1704 .metadata
1705 .get(&id)
1706 .cloned()
1707 .map(|val| serde_json::from_value::<T>(val).unwrap())
1708 })
1709 .collect()
1710 }
1711}
1712
1713pub trait CircuitBase: 'static {
1715 fn edges(&self) -> Ref<'_, Edges>;
1716
1717 fn edges_mut(&self) -> RefMut<'_, Edges>;
1718
1719 fn global_id(&self) -> &GlobalNodeId;
1723
1724 fn num_nodes(&self) -> usize;
1726
1727 fn node_ids(&self) -> Vec<NodeId>;
1729
1730 fn lookup_local_node_by_persistent_id(
1731 &self,
1732 persistent_id: &str,
1733 ) -> Result<GlobalNodeId, DbspError>;
1734
1735 fn import_nodes(&self) -> Vec<NodeId>;
1736
1737 fn clear(&mut self);
1738
1739 fn add_dependency(&self, from: NodeId, to: NodeId);
1744
1745 fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>>;
1748
1749 fn allocate_stream_id(&self) -> StreamId;
1752
1753 fn last_stream_id(&self) -> RefCell<StreamId>;
1755
1756 fn root_scope(&self) -> Scope;
1761
1762 fn node_id(&self) -> NodeId;
1764
1765 fn global_node_id(&self) -> GlobalNodeId;
1767
1768 fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node));
1770
1771 fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node));
1773
1774 fn map_nodes_recursive(
1778 &self,
1779 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1780 ) -> Result<(), DbspError>;
1781
1782 fn map_nodes_recursive_mut(
1786 &mut self,
1787 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1788 ) -> Result<(), DbspError>;
1789
1790 fn map_local_nodes(
1794 &self,
1795 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1796 ) -> Result<(), DbspError>;
1797
1798 fn map_local_nodes_mut(
1800 &self,
1801 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1802 ) -> Result<(), DbspError>;
1803
1804 fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node));
1810
1811 fn map_subcircuits(
1815 &self,
1816 f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
1817 ) -> Result<(), DbspError>;
1818
1819 fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str);
1826
1827 fn set_persistent_node_id(&self, id: &GlobalNodeId, persistent_id: Option<&str>) {
1828 if let Some(persistent_id) = persistent_id {
1829 self.set_node_label(id, LABEL_PERSISTENT_OPERATOR_ID, persistent_id);
1830 }
1831 }
1832
1833 fn set_mir_node_id(&self, id: &GlobalNodeId, mir_id: Option<&str>) {
1834 if let Some(mir_id) = mir_id {
1835 self.set_node_label(id, LABEL_MIR_NODE_ID, mir_id);
1836 }
1837 }
1838
1839 fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String>;
1846
1847 fn get_persistent_node_id(&self, id: &GlobalNodeId) -> Option<String> {
1849 self.get_node_label(id, LABEL_PERSISTENT_OPERATOR_ID)
1850 }
1851
1852 fn check_fixedpoint(&self, scope: Scope) -> bool;
1853
1854 fn metadata_exchange(&self) -> &MetadataExchange;
1856
1857 fn balancer(&self) -> &Balancer;
1859
1860 fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError>;
1862
1863 fn set_balancer_hint_by_global_id(
1870 &self,
1871 global_node_id: &GlobalNodeId,
1872 hint: BalancerHint,
1873 ) -> Result<(), DbspError>;
1874
1875 fn set_balancer_hint(&self, persistent_id: &str, hint: BalancerHint) -> Result<(), DbspError>;
1876
1877 fn get_current_balancer_policies(&self) -> BTreeMap<NodeId, PartitioningPolicy>;
1879
1880 fn get_current_balancer_policy(
1881 &self,
1882 persistent_id: &str,
1883 ) -> Result<PartitioningPolicy, DbspError>;
1884
1885 fn rebalance(&self);
1886
1887 fn start_compaction(&self);
1888}
1889
1890pub trait Circuit: CircuitBase + Clone + WithClock {
1903 type Parent;
1905
1906 fn parent(&self) -> Self::Parent;
1908
1909 fn root_circuit(&self) -> RootCircuit;
1911
1912 fn ptr_eq(this: &Self, other: &Self) -> bool;
1914
1915 fn circuit_event_handlers(&self) -> CircuitEventHandlers;
1917
1918 fn scheduler_event_handlers(&self) -> SchedulerEventHandlers;
1920
1921 fn log_circuit_event(&self, event: &CircuitEvent);
1923
1924 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>);
1926
1927 fn map_node<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> T) -> T;
1934
1935 fn map_node_mut<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1942
1943 fn map_local_node_mut<T>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1949
1950 fn cache_get_or_insert_with<K, F>(&self, key: K, f: F) -> RefMut<'_, K::Value>
1955 where
1956 K: 'static + TypedMapKey<CircuitStoreMarker>,
1957 F: FnMut() -> K::Value;
1958
1959 fn tick(&self);
1962
1963 fn clock_start(&self, scope: Scope);
1965
1966 fn clock_end(&self, scope: Scope);
1968
1969 fn ready(&self, id: NodeId) -> bool;
1972
1973 fn cache_insert<K>(&self, key: K, val: K::Value)
1978 where
1979 K: TypedMapKey<CircuitStoreMarker> + 'static;
1980
1981 fn cache_contains<K>(&self, key: &K) -> bool
1982 where
1983 K: TypedMapKey<CircuitStoreMarker> + 'static;
1984
1985 fn cache_get<K>(&self, key: &K) -> Option<K::Value>
1986 where
1987 K: TypedMapKey<CircuitStoreMarker> + 'static,
1988 K::Value: Clone;
1989
1990 fn get_replay_source(&self, stream_id: StreamId) -> Option<Box<dyn StreamMetadata>> {
1993 self.cache_get(&ReplaySource::new(stream_id))
1994 }
1995
1996 fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata);
1999
2000 fn connect_stream<T: 'static>(
2002 &self,
2003 stream: &Stream<Self, T>,
2004 to: NodeId,
2005 ownership_preference: OwnershipPreference,
2006 );
2007
2008 fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>);
2009
2010 fn is_async_node(&self, id: NodeId) -> bool;
2011
2012 fn eval_node(
2016 &self,
2017 id: NodeId,
2018 ) -> impl Future<Output = Result<Option<Position>, SchedulerError>>;
2019
2020 fn eval_import_node(&self, id: NodeId) -> impl Future<Output = ()>;
2022
2023 fn flush_node(&self, id: NodeId);
2024
2025 fn is_flush_complete(&self, id: NodeId) -> bool;
2026
2027 #[track_caller]
2035 fn region<F, T>(&self, name: &str, f: F) -> T
2036 where
2037 F: FnOnce() -> T;
2038
2039 fn create_region_name(&self, name: &str, id: u64) -> RegionName {
2047 RegionName::new(name, id)
2048 }
2049
2050 #[track_caller]
2063 fn open_region(&self, name: RegionName);
2064
2065 fn close_region(&self, name: RegionName);
2071
2072 fn add_preprocessor(&self, preprocessor_node_id: NodeId);
2076
2077 fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
2079 where
2080 O: Data,
2081 Op: SourceOperator<O>;
2082
2083 fn add_exchange<I, SndOp, O, RcvOp>(
2111 &self,
2112 sender: SndOp,
2113 receiver: RcvOp,
2114 input_stream: &Stream<Self, I>,
2115 ) -> Stream<Self, O>
2116 where
2117 I: Data,
2118 O: Data,
2119 SndOp: SinkOperator<I>,
2120 RcvOp: SourceOperator<O>;
2121
2122 fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
2125 &self,
2126 sender: SndOp,
2127 receiver: RcvOp,
2128 input_stream: &Stream<Self, I>,
2129 input_preference: OwnershipPreference,
2130 ) -> Stream<Self, O>
2131 where
2132 I: Data,
2133 O: Data,
2134 SndOp: SinkOperator<I>,
2135 RcvOp: SourceOperator<O>;
2136
2137 fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
2139 where
2140 I: Data,
2141 Op: SinkOperator<I>;
2142
2143 fn add_sink_with_preference<I, Op>(
2146 &self,
2147 operator: Op,
2148 input_stream: &Stream<Self, I>,
2149 input_preference: OwnershipPreference,
2150 ) -> GlobalNodeId
2151 where
2152 I: Data,
2153 Op: SinkOperator<I>;
2154
2155 fn add_binary_sink<I1, I2, Op>(
2157 &self,
2158 operator: Op,
2159 input_stream1: &Stream<Self, I1>,
2160 input_stream2: &Stream<Self, I2>,
2161 ) where
2162 I1: Data,
2163 I2: Data,
2164 Op: BinarySinkOperator<I1, I2>;
2165
2166 fn add_binary_sink_with_preference<I1, I2, Op>(
2170 &self,
2171 operator: Op,
2172 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2173 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2174 ) where
2175 I1: Data,
2176 I2: Data,
2177 Op: BinarySinkOperator<I1, I2>;
2178
2179 fn add_ternary_sink<I1, I2, I3, Op>(
2181 &self,
2182 operator: Op,
2183 input_stream1: &Stream<Self, I1>,
2184 input_stream2: &Stream<Self, I2>,
2185 input_stream3: &Stream<Self, I3>,
2186 ) -> GlobalNodeId
2187 where
2188 I1: Data,
2189 I2: Data,
2190 I3: Data,
2191 Op: TernarySinkOperator<I1, I2, I3>;
2192
2193 fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
2196 &self,
2197 operator: Op,
2198 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2199 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2200 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2201 ) -> GlobalNodeId
2202 where
2203 I1: Data,
2204 I2: Data,
2205 I3: Data,
2206 Op: TernarySinkOperator<I1, I2, I3>;
2207
2208 fn add_unary_operator<I, O, Op>(
2210 &self,
2211 operator: Op,
2212 input_stream: &Stream<Self, I>,
2213 ) -> Stream<Self, O>
2214 where
2215 I: Data,
2216 O: Data,
2217 Op: UnaryOperator<I, O>;
2218
2219 fn add_unary_operator_with_preference<I, O, Op>(
2222 &self,
2223 operator: Op,
2224 input_stream: &Stream<Self, I>,
2225 input_preference: OwnershipPreference,
2226 ) -> Stream<Self, O>
2227 where
2228 I: Data,
2229 O: Data,
2230 Op: UnaryOperator<I, O>;
2231
2232 fn add_binary_operator<I1, I2, O, Op>(
2234 &self,
2235 operator: Op,
2236 input_stream1: &Stream<Self, I1>,
2237 input_stream2: &Stream<Self, I2>,
2238 ) -> Stream<Self, O>
2239 where
2240 I1: Data,
2241 I2: Data,
2242 O: Data,
2243 Op: BinaryOperator<I1, I2, O>;
2244
2245 fn add_binary_operator_with_preference<I1, I2, O, Op>(
2249 &self,
2250 operator: Op,
2251 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2252 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2253 ) -> Stream<Self, O>
2254 where
2255 I1: Data,
2256 I2: Data,
2257 O: Data,
2258 Op: BinaryOperator<I1, I2, O>;
2259
2260 fn add_ternary_operator<I1, I2, I3, O, Op>(
2262 &self,
2263 operator: Op,
2264 input_stream1: &Stream<Self, I1>,
2265 input_stream2: &Stream<Self, I2>,
2266 input_stream3: &Stream<Self, I3>,
2267 ) -> Stream<Self, O>
2268 where
2269 I1: Data,
2270 I2: Data,
2271 I3: Data,
2272 O: Data,
2273 Op: TernaryOperator<I1, I2, I3, O>;
2274
2275 #[allow(clippy::too_many_arguments)]
2278 fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
2279 &self,
2280 operator: Op,
2281 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2282 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2283 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2284 ) -> Stream<Self, O>
2285 where
2286 I1: Data,
2287 I2: Data,
2288 I3: Data,
2289 O: Data,
2290 Op: TernaryOperator<I1, I2, I3, O>;
2291
2292 fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
2294 &self,
2295 operator: Op,
2296 input_stream1: &Stream<Self, I1>,
2297 input_stream2: &Stream<Self, I2>,
2298 input_stream3: &Stream<Self, I3>,
2299 input_stream4: &Stream<Self, I4>,
2300 ) -> Stream<Self, O>
2301 where
2302 I1: Data,
2303 I2: Data,
2304 I3: Data,
2305 I4: Data,
2306 O: Data,
2307 Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2308
2309 #[allow(clippy::too_many_arguments)]
2312 fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
2313 &self,
2314 operator: Op,
2315 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2316 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2317 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2318 input_stream4: (&Stream<Self, I4>, OwnershipPreference),
2319 ) -> Stream<Self, O>
2320 where
2321 I1: Data,
2322 I2: Data,
2323 I3: Data,
2324 I4: Data,
2325 O: Data,
2326 Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2327
2328 fn add_nary_operator<'a, I, O, Op, Iter>(
2330 &'a self,
2331 operator: Op,
2332 input_streams: Iter,
2333 ) -> Stream<Self, O>
2334 where
2335 I: Data,
2336 O: Data,
2337 Op: NaryOperator<I, O>,
2338 Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2339
2340 fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
2343 &'a self,
2344 operator: Op,
2345 input_streams: Iter,
2346 input_preference: OwnershipPreference,
2347 ) -> Stream<Self, O>
2348 where
2349 I: Data,
2350 O: Data,
2351 Op: NaryOperator<I, O>,
2352 Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2353
2354 fn add_custom_node<N: Node, R>(
2360 &self,
2361 name: Cow<'static, str>,
2362 constructor: impl FnOnce(NodeId) -> (N, R),
2363 ) -> R;
2364
2365 fn add_feedback<I, O, Op>(
2415 &self,
2416 operator: Op,
2417 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2418 where
2419 I: Data,
2420 O: Data,
2421 Op: StrictUnaryOperator<I, O>;
2422
2423 fn add_feedback_persistent<I, O, Op>(
2425 &self,
2426 persistent_id: Option<&str>,
2427 operator: Op,
2428 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2429 where
2430 I: Data,
2431 O: Data,
2432 Op: StrictUnaryOperator<I, O>,
2433 {
2434 let (output, feedback) = self.add_feedback(operator);
2435
2436 output.set_persistent_id(persistent_id);
2437
2438 (output, feedback)
2439 }
2440
2441 fn add_feedback_with_export<I, O, Op>(
2455 &self,
2456 operator: Op,
2457 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2458 where
2459 I: Data,
2460 O: Data,
2461 Op: StrictUnaryOperator<I, O>;
2462
2463 fn add_feedback_with_export_persistent<I, O, Op>(
2465 &self,
2466 persistent_id: Option<&str>,
2467 operator: Op,
2468 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2469 where
2470 I: Data,
2471 O: Data,
2472 Op: StrictUnaryOperator<I, O>,
2473 {
2474 let (export, feedback) = self.add_feedback_with_export(operator);
2475
2476 export.local.set_persistent_id(persistent_id);
2477
2478 (export, feedback)
2479 }
2480
2481 fn connect_feedback_with_preference<I, O, Op>(
2482 &self,
2483 output_node_id: NodeId,
2484 operator: Rc<RefCell<Op>>,
2485 input_stream: &Stream<Self, I>,
2486 input_preference: OwnershipPreference,
2487 ) where
2488 I: Data,
2489 O: Data,
2490 Op: StrictUnaryOperator<I, O>;
2491
2492 fn iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2506 where
2507 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2508 E: Executor<IterativeCircuit<Self>>;
2509
2510 fn non_iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2513 where
2514 F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2515 E: Executor<NonIterativeCircuit<Self>>;
2516
2517 fn iterate<F, C, T>(&self, constructor: F) -> Result<T, SchedulerError>
2588 where
2589 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2590 C: AsyncFn() -> Result<bool, SchedulerError> + 'static;
2591
2592 fn iterate_with_scheduler<F, C, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2597 where
2598 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2599 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
2600 S: Scheduler + 'static;
2601
2602 fn fixedpoint<F, T>(&self, constructor: F) -> Result<T, SchedulerError>
2638 where
2639 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>;
2640
2641 fn fixedpoint_with_scheduler<F, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2646 where
2647 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>,
2648 S: Scheduler + 'static;
2649
2650 fn import_stream<I, O, Op>(
2655 &self,
2656 operator: Op,
2657 parent_stream: &Stream<Self::Parent, I>,
2658 ) -> Stream<Self, O>
2659 where
2660 Self::Parent: Circuit,
2661 I: Data,
2662 O: Data,
2663 Op: ImportOperator<I, O>;
2664
2665 fn import_stream_with_preference<I, O, Op>(
2668 &self,
2669 operator: Op,
2670 parent_stream: &Stream<Self::Parent, I>,
2671 input_preference: OwnershipPreference,
2672 ) -> Stream<Self, O>
2673 where
2674 Self::Parent: Circuit,
2675 I: Data,
2676 O: Data,
2677 Op: ImportOperator<I, O>;
2678}
2679
2680pub struct Edges {
2682 by_source: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2683 by_destination: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2684 by_stream: BTreeMap<Option<StreamId>, Vec<Rc<Edge>>>,
2685}
2686
2687impl Edges {
2688 fn new() -> Self {
2689 Self {
2690 by_source: BTreeMap::new(),
2691 by_destination: BTreeMap::new(),
2692 by_stream: BTreeMap::new(),
2693 }
2694 }
2695
2696 fn add_edge(&mut self, edge: Edge) {
2697 let edge = Rc::new(edge);
2698
2699 self.by_source
2700 .entry(edge.from)
2701 .or_default()
2702 .push(edge.clone());
2703 self.by_destination
2704 .entry(edge.to)
2705 .or_default()
2706 .push(edge.clone());
2707
2708 self.by_stream
2709 .entry(edge.stream.as_ref().map(|s| s.stream_id()))
2710 .or_default()
2711 .push(edge);
2712 }
2713
2714 fn extend<I>(&mut self, edges: I)
2715 where
2716 I: IntoIterator<Item = Edge>,
2717 {
2718 for edge in edges {
2719 self.add_edge(edge)
2720 }
2721 }
2722
2723 pub(crate) fn iter(&self) -> impl Iterator<Item = &Edge> {
2724 self.by_source
2725 .values()
2726 .flat_map(|edges| edges.iter().map(|edge| edge.as_ref()))
2727 }
2728
2729 pub(crate) fn get_by_stream_id(&self, stream_id: &Option<StreamId>) -> Option<&[Rc<Edge>]> {
2730 self.by_stream.get(stream_id).map(|v| v.as_slice())
2731 }
2732
2733 fn delete_stream(&mut self, stream_id: StreamId) {
2734 if let Some(edges) = self.by_stream.remove(&Some(stream_id)) {
2735 for edge in edges {
2736 if let Some(v) = self.by_source.get_mut(&edge.from) {
2737 v.retain(|e| e.stream_id() != Some(stream_id))
2738 }
2739 if let Some(v) = self.by_destination.get_mut(&edge.to) {
2740 v.retain(|e| e.stream_id() != Some(stream_id))
2741 }
2742 }
2743 }
2744 }
2745
2746 pub(crate) fn inputs_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2747 self.by_destination
2748 .get(&node_id)
2749 .into_iter()
2750 .flatten()
2751 .map(|edge| edge.as_ref())
2752 }
2753
2754 pub(crate) fn depend_on(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2758 self.by_source.get(&node_id).into_iter().flat_map(|edges| {
2759 edges.iter().filter_map(|edge| {
2760 if edge.is_dependency() {
2761 Some(edge.as_ref())
2762 } else {
2763 None
2764 }
2765 })
2766 })
2767 }
2768
2769 pub(crate) fn dependencies_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2773 self.by_destination
2774 .get(&node_id)
2775 .into_iter()
2776 .flat_map(|edges| {
2777 edges.iter().filter_map(|edge| {
2778 if edge.is_dependency() {
2779 Some(edge.as_ref())
2780 } else {
2781 None
2782 }
2783 })
2784 })
2785 }
2786
2787 fn clear(&mut self) {
2788 *self = Self::new();
2789 }
2790}
2791
2792struct CircuitInner<P>
2796where
2797 P: 'static,
2798{
2799 parent: P,
2800
2801 root: Option<RootCircuit>,
2803
2804 root_scope: Scope,
2805
2806 node_id: NodeId,
2808 global_node_id: GlobalNodeId,
2809 nodes: RefCell<Vec<RefCell<Box<dyn Node>>>>,
2810 edges: RefCell<Edges>,
2811 import_nodes: RefCell<Vec<NodeId>>,
2812 circuit_event_handlers: CircuitEventHandlers,
2813 scheduler_event_handlers: SchedulerEventHandlers,
2814 store: RefCell<CircuitCache>,
2815 last_stream_id: RefCell<StreamId>,
2816 metadata_exchange: MetadataExchange,
2817 balancer: Rc<Balancer>,
2818}
2819
2820impl<P> CircuitInner<P>
2821where
2822 P: 'static,
2823{
2824 #[allow(clippy::too_many_arguments)]
2825 fn new(
2826 parent: P,
2827 root: Option<RootCircuit>,
2828 root_scope: Scope,
2829 node_id: NodeId,
2830 global_node_id: GlobalNodeId,
2831 circuit_event_handlers: CircuitEventHandlers,
2832 scheduler_event_handlers: SchedulerEventHandlers,
2833 last_stream_id: RefCell<StreamId>,
2834 ) -> Self {
2835 let metadata_exchange = MetadataExchange::new();
2836
2837 Self {
2838 parent,
2839 root,
2840 root_scope,
2841 node_id,
2842 global_node_id,
2843 nodes: RefCell::new(Vec::new()),
2844 edges: RefCell::new(Edges::new()),
2845 import_nodes: RefCell::new(Vec::new()),
2846 circuit_event_handlers,
2847 scheduler_event_handlers,
2848 store: RefCell::new(TypedMap::new()),
2849 last_stream_id,
2850 metadata_exchange: metadata_exchange.clone(),
2851 balancer: Rc::new(Balancer::new(&metadata_exchange)),
2852 }
2853 }
2854
2855 fn add_edge(&self, edge: Edge) {
2856 self.edges.borrow_mut().add_edge(edge);
2857 }
2858
2859 fn add_node<N>(&self, mut node: N)
2860 where
2861 N: Node + 'static,
2862 {
2863 node.init();
2864 self.nodes
2865 .borrow_mut()
2866 .push(RefCell::new(Box::new(node) as Box<dyn Node>));
2867 }
2868
2869 fn add_import_node(&self, node_id: NodeId) {
2870 self.import_nodes.borrow_mut().push(node_id);
2871 }
2872
2873 fn import_nodes(&self) -> Vec<NodeId> {
2874 self.import_nodes.borrow().clone()
2875 }
2876
2877 fn clear(&self) {
2878 self.nodes.borrow_mut().clear();
2879 self.edges.borrow_mut().clear();
2880 self.store.borrow_mut().clear();
2881 }
2882
2883 fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
2884 where
2885 F: Fn(&CircuitEvent) + 'static,
2886 {
2887 self.circuit_event_handlers.borrow_mut().insert(
2888 name.to_string(),
2889 Box::new(handler) as Box<dyn Fn(&CircuitEvent)>,
2890 );
2891 }
2892
2893 fn unregister_circuit_event_handler(&self, name: &str) -> bool {
2894 self.circuit_event_handlers
2895 .borrow_mut()
2896 .remove(name)
2897 .is_some()
2898 }
2899
2900 fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
2901 where
2902 F: FnMut(&SchedulerEvent<'_>) + 'static,
2903 {
2904 self.scheduler_event_handlers.borrow_mut().insert(
2905 name.to_string(),
2906 Box::new(handler) as Box<dyn FnMut(&SchedulerEvent<'_>)>,
2907 );
2908 }
2909
2910 fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
2911 self.scheduler_event_handlers
2912 .borrow_mut()
2913 .remove(name)
2914 .is_some()
2915 }
2916
2917 fn log_circuit_event(&self, event: &CircuitEvent) {
2918 for (_, handler) in self.circuit_event_handlers.borrow().iter() {
2919 handler(event)
2920 }
2921 }
2922
2923 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
2924 for (_, handler) in self.scheduler_event_handlers.borrow_mut().iter_mut() {
2925 handler(event)
2926 }
2927 }
2928
2929 fn check_fixedpoint(&self, scope: Scope) -> bool {
2930 self.nodes.borrow().iter().all(|node| {
2931 node.borrow().fixedpoint(scope)
2939 })
2940 }
2941
2942 fn lookup_local_node_by_persistent_id(
2943 &self,
2944 persistent_id: &str,
2945 ) -> Result<GlobalNodeId, DbspError> {
2946 self.nodes
2947 .borrow()
2948 .iter()
2949 .find_map(|node| {
2950 if node.borrow().get_label(LABEL_PERSISTENT_OPERATOR_ID) == Some(persistent_id) {
2951 Some(node.borrow().global_id().clone())
2952 } else {
2953 None
2954 }
2955 })
2956 .ok_or_else(|| {
2957 DbspError::Runtime(RuntimeError::UnknownPersistentId(persistent_id.to_string()))
2958 })
2959 }
2960}
2961
2962pub struct ChildCircuit<P, T>
2968where
2969 P: 'static,
2970 T: Timestamp,
2971{
2972 inner: Rc<CircuitInner<P>>,
2973 time: Rc<RefCell<T>>,
2974}
2975
2976pub type RootCircuit = ChildCircuit<(), ()>;
2991
2992pub type NestedCircuit = ChildCircuit<RootCircuit, <() as Timestamp>::Nested>;
2993
2994pub type IterativeCircuit<P> = ChildCircuit<P, <<P as WithClock>::Time as Timestamp>::Nested>;
2996
2997pub type NonIterativeCircuit<P> = ChildCircuit<P, <P as WithClock>::Time>;
2999
3000impl<P, T> Clone for ChildCircuit<P, T>
3001where
3002 P: 'static,
3003 T: Timestamp,
3004{
3005 fn clone(&self) -> Self {
3006 Self {
3007 inner: self.inner.clone(),
3008 time: self.time.clone(),
3009 }
3010 }
3011}
3012
3013impl<P, T> ChildCircuit<P, T>
3014where
3015 P: 'static,
3016 T: Timestamp,
3017{
3018 fn inner(&self) -> &CircuitInner<P> {
3020 &self.inner
3021 }
3022}
3023
3024impl RootCircuit {
3025 pub fn build<F, T>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
3069 where
3070 F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
3071 {
3072 Self::build_with_scheduler::<F, T, DynamicScheduler>(constructor)
3073 }
3074
3075 pub fn build_with_scheduler<F, T, S>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
3081 where
3082 F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
3083 S: Scheduler + 'static,
3084 {
3085 let tokio_runtime = tokio::runtime::Builder::new_current_thread()
3089 .build()
3090 .map_err(|e| {
3091 DbspError::Scheduler(SchedulerError::TokioError {
3092 error: e.to_string(),
3093 })
3094 })?;
3095
3096 let mut circuit = RootCircuit::new();
3097 let res = constructor(&mut circuit).map_err(DbspError::Constructor)?;
3098 let mut executor = Box::new(<OnceExecutor<S>>::new()) as Box<dyn Executor<RootCircuit>>;
3099 executor.prepare(&circuit, None)?;
3100
3101 circuit.log_scheduler_event(&SchedulerEvent::clock_start());
3138 circuit.clock_start(0);
3139 Ok((
3140 CircuitHandle {
3141 circuit,
3142 executor,
3143 tokio_runtime,
3144 replay_info: None,
3145 },
3146 res,
3147 ))
3148 }
3149}
3150
3151impl RootCircuit {
3152 fn new() -> Self {
3155 Self {
3156 inner: Rc::new(CircuitInner::new(
3157 (),
3158 None,
3159 0,
3160 NodeId::root(),
3161 GlobalNodeId::root(),
3162 Rc::new(RefCell::new(HashMap::new())),
3163 Rc::new(RefCell::new(HashMap::new())),
3164 RefCell::new(StreamId::new(0)),
3165 )),
3166 time: Rc::new(RefCell::new(())),
3167 }
3168 }
3169}
3170
3171impl RootCircuit {
3172 pub fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
3192 where
3193 F: Fn(&CircuitEvent) + 'static,
3194 {
3195 self.inner().register_circuit_event_handler(name, handler);
3196 }
3197
3198 pub fn unregister_circuit_event_handler(&self, name: &str) -> bool {
3201 self.inner().unregister_circuit_event_handler(name)
3202 }
3203
3204 pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
3219 where
3220 F: FnMut(&SchedulerEvent<'_>) + 'static,
3221 {
3222 self.inner().register_scheduler_event_handler(name, handler);
3223 }
3224
3225 pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
3228 self.inner().unregister_scheduler_event_handler(name)
3229 }
3230}
3231
3232impl<P, T> ChildCircuit<P, T>
3233where
3234 P: Circuit,
3235 T: Timestamp,
3236{
3237 fn with_parent(parent: P, id: NodeId) -> Self {
3239 let global_node_id = parent.global_node_id().child(id);
3240 let circuit_handlers = parent.circuit_event_handlers();
3241 let sched_handlers = parent.scheduler_event_handlers();
3242 let root_scope = parent.root_scope() + 1;
3243 let last_stream_id = parent.last_stream_id();
3244
3245 let root = parent.root_circuit();
3246
3247 ChildCircuit {
3248 inner: Rc::new(CircuitInner::new(
3249 parent,
3250 Some(root),
3251 root_scope,
3252 id,
3253 global_node_id,
3254 circuit_handlers,
3255 sched_handlers,
3256 last_stream_id,
3257 )),
3258 time: Rc::new(RefCell::new(Timestamp::clock_start())),
3259 }
3260 }
3261
3262 pub fn is_child_of(&self, other: &P) -> bool {
3264 P::ptr_eq(&self.inner().parent, other)
3265 }
3266}
3267
3268impl<P, T> ChildCircuit<P, T>
3270where
3271 P: 'static,
3272 T: Timestamp,
3273 Self: Circuit,
3274{
3275 fn node_id(&self) -> NodeId {
3277 self.inner().node_id
3278 }
3279
3280 fn add_node<F, N, V>(&self, f: F) -> V
3286 where
3287 F: FnOnce(NodeId) -> (N, V),
3288 N: Node + 'static,
3289 {
3290 let id = self.inner().nodes.borrow().len();
3291
3292 let (node, res) = f(NodeId(id));
3295 self.inner().add_node(node);
3296 res
3297 }
3298
3299 fn add_import_node(&self, node_id: NodeId) {
3300 self.inner().add_import_node(node_id);
3301 }
3302
3303 fn try_add_node<F, N, V, E>(&self, f: F) -> Result<V, E>
3305 where
3306 F: FnOnce(NodeId) -> Result<(N, V), E>,
3307 N: Node + 'static,
3308 {
3309 let id = self.inner().nodes.borrow().len();
3310
3311 let (node, res) = f(NodeId(id))?;
3314 self.inner().add_node(node);
3315 Ok(res)
3316 }
3317
3318 fn log_circuit_event(&self, event: &CircuitEvent) {
3321 self.inner().log_circuit_event(event);
3322 }
3323
3324 pub(super) fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3327 self.inner().log_scheduler_event(event);
3328 }
3329}
3330
3331impl<P, T> CircuitBase for ChildCircuit<P, T>
3332where
3333 P: Clone + 'static,
3334 T: Timestamp,
3335{
3336 fn edges(&self) -> Ref<'_, Edges> {
3337 self.inner().edges.borrow()
3338 }
3339
3340 fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>> {
3341 let edges = self.edges();
3342 let mut result = BTreeMap::new();
3343
3344 for node_id in self.node_ids() {
3346 let mut ancestors = BTreeSet::new();
3347 let mut queue = vec![node_id];
3348
3349 while let Some(current) = queue.pop() {
3351 for edge in edges.inputs_of(current) {
3352 let ancestor_node = edge.from;
3353 if ancestors.insert(ancestor_node) {
3354 queue.push(ancestor_node);
3355 }
3356 }
3357 }
3358
3359 result.insert(node_id, ancestors);
3360 }
3361
3362 result
3363 }
3364
3365 fn edges_mut(&self) -> RefMut<'_, Edges> {
3366 self.inner().edges.borrow_mut()
3367 }
3368
3369 fn num_nodes(&self) -> usize {
3370 self.inner().nodes.borrow().len()
3371 }
3372
3373 fn clear(&mut self) {
3374 self.inner().clear();
3375 }
3376
3377 fn add_dependency(&self, from: NodeId, to: NodeId) {
3378 self.log_circuit_event(&CircuitEvent::dependency(
3379 self.global_node_id().child(from),
3380 self.global_node_id().child(to),
3381 ));
3382
3383 let origin = self.global_node_id().child(from);
3384 self.inner().add_edge(Edge {
3385 from,
3386 to,
3387 origin,
3388 stream: None,
3389 ownership_preference: None,
3390 });
3391 }
3392
3393 fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
3395 let nodes = self.inner().nodes.borrow();
3396 let node = nodes[path[0].0].borrow();
3397 if path.len() == 1 {
3398 f(node.as_ref())
3399 } else {
3400 node.map_child(&path[1..], &mut |node| f(node));
3401 }
3402 }
3403
3404 fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
3406 let nodes = self.inner().nodes.borrow();
3407 let mut node = nodes[path[0].0].borrow_mut();
3408 if path.len() == 1 {
3409 f(node.as_mut())
3410 } else {
3411 node.map_child_mut(&path[1..], &mut |node| f(node));
3412 }
3413 }
3414
3415 fn map_nodes_recursive(
3416 &self,
3417 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3418 ) -> Result<(), DbspError> {
3419 for node in self.inner().nodes.borrow().iter() {
3420 f(node.borrow().as_ref())?;
3421 node.borrow().map_nodes_recursive(f)?;
3422 }
3423 Ok(())
3424 }
3425
3426 fn map_nodes_recursive_mut(
3427 &mut self,
3428 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3429 ) -> Result<(), DbspError> {
3430 for node in self.inner().nodes.borrow_mut().iter_mut() {
3431 f(node.borrow_mut().as_mut())?;
3432 node.borrow_mut().map_nodes_recursive_mut(f)?;
3433 }
3434
3435 Ok(())
3436 }
3437
3438 fn map_local_nodes(
3439 &self,
3440 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3441 ) -> Result<(), DbspError> {
3442 for node in self.inner().nodes.borrow().iter() {
3443 f(node.borrow().as_ref())?;
3444 }
3445 Ok(())
3446 }
3447
3448 fn map_local_nodes_mut(
3449 &self,
3450 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3451 ) -> Result<(), DbspError> {
3452 for node in self.inner().nodes.borrow_mut().iter_mut() {
3453 f(node.borrow_mut().as_mut())?;
3454 }
3455
3456 Ok(())
3457 }
3458
3459 fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node)) {
3460 self.map_node_mut_relative(&[id], &mut |node| f(node));
3461 }
3462
3463 fn map_subcircuits(
3464 &self,
3465 f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
3466 ) -> Result<(), DbspError> {
3467 for node in self.inner().nodes.borrow().iter() {
3468 let node = node.borrow();
3469 if let Some(child_circuit) = node.as_circuit() {
3470 f(child_circuit)?;
3471 }
3472 }
3473 Ok(())
3474 }
3475
3476 fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str) {
3477 self.map_node_mut(id, &mut |node| node.set_label(key, val));
3478 }
3479
3480 fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String> {
3481 self.map_node(id, &mut |node| node.get_label(key).map(str::to_string))
3482 }
3483
3484 fn global_id(&self) -> &GlobalNodeId {
3485 &self.inner().global_node_id
3486 }
3487
3488 fn node_ids(&self) -> Vec<NodeId> {
3490 self.inner()
3491 .nodes
3492 .borrow()
3493 .iter()
3494 .map(|node| node.borrow().local_id())
3495 .collect()
3496 }
3497
3498 fn lookup_local_node_by_persistent_id(
3499 &self,
3500 persistent_id: &str,
3501 ) -> Result<GlobalNodeId, DbspError> {
3502 self.inner()
3503 .lookup_local_node_by_persistent_id(persistent_id)
3504 }
3505
3506 fn import_nodes(&self) -> Vec<NodeId> {
3507 self.inner().import_nodes()
3508 }
3509
3510 fn allocate_stream_id(&self) -> StreamId {
3511 let circuit = self.inner();
3512 let mut last_stream_id = circuit.last_stream_id.borrow_mut();
3513 last_stream_id.0 += 1;
3514 *last_stream_id
3515 }
3516
3517 fn last_stream_id(&self) -> RefCell<StreamId> {
3518 self.inner().last_stream_id.clone()
3519 }
3520
3521 fn root_scope(&self) -> Scope {
3522 self.inner().root_scope
3523 }
3524
3525 fn node_id(&self) -> NodeId {
3526 self.inner().node_id
3527 }
3528
3529 fn global_node_id(&self) -> GlobalNodeId {
3530 self.inner().global_node_id.clone()
3531 }
3532
3533 fn check_fixedpoint(&self, scope: Scope) -> bool {
3534 self.inner().check_fixedpoint(scope)
3535 }
3536
3537 fn metadata_exchange(&self) -> &MetadataExchange {
3538 &self.inner().metadata_exchange
3539 }
3540
3541 fn balancer(&self) -> &Balancer {
3542 &self.inner().balancer
3543 }
3544
3545 fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError> {
3546 self.inner().balancer.set_auto_rebalance(enable)
3547 }
3548
3549 fn set_balancer_hint_by_global_id(
3550 &self,
3551 global_node_id: &GlobalNodeId,
3552 hint: BalancerHint,
3553 ) -> Result<(), DbspError> {
3554 if global_node_id.parent_id() != Some(GlobalNodeId::root()) {
3555 return Err(DbspError::Balancer(BalancerError::NonTopLevelNode(
3556 global_node_id.clone(),
3557 )));
3558 }
3559
3560 self.inner()
3561 .balancer
3562 .set_hint(global_node_id.local_node_id().unwrap(), hint)
3563 }
3564
3565 fn set_balancer_hint(&self, persistent_id: &str, hint: BalancerHint) -> Result<(), DbspError> {
3566 let global_node_id = self.lookup_local_node_by_persistent_id(persistent_id)?;
3567 self.set_balancer_hint_by_global_id(&global_node_id, hint)
3568 }
3569
3570 fn get_current_balancer_policies(&self) -> BTreeMap<NodeId, PartitioningPolicy> {
3571 self.inner().balancer.get_policy()
3572 }
3573
3574 fn get_current_balancer_policy(
3575 &self,
3576 persistent_id: &str,
3577 ) -> Result<PartitioningPolicy, DbspError> {
3578 let global_node_id = self.lookup_local_node_by_persistent_id(persistent_id)?;
3579 let local_node_id = global_node_id.local_node_id().unwrap();
3580 self.inner()
3581 .balancer
3582 .get_policy_for_stream(local_node_id)
3583 .ok_or_else(|| {
3584 DbspError::Balancer(BalancerError::NotRegisteredWithBalancer(local_node_id))
3585 })
3586 }
3587
3588 fn rebalance(&self) {
3589 self.inner().balancer.rebalance()
3590 }
3591
3592 fn start_compaction(&self) {
3593 let _ = self.map_local_nodes_mut(&mut |node| {
3594 node.start_compaction();
3595 Ok(())
3596 });
3597 }
3598}
3599
3600impl<P, T> Circuit for ChildCircuit<P, T>
3601where
3602 P: Clone + 'static,
3603 T: Timestamp,
3604{
3605 type Parent = P;
3606
3607 fn parent(&self) -> P {
3608 self.inner().parent.clone()
3609 }
3610
3611 fn root_circuit(&self) -> RootCircuit {
3612 if <dyn Any>::is::<RootCircuit>(self) {
3613 unsafe { transmute::<&Self, &RootCircuit>(self) }.clone()
3614 } else {
3615 self.inner().root.as_ref().unwrap().clone()
3616 }
3617 }
3618
3619 fn map_node<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> V) -> V {
3620 let path = id.path();
3621 let mut result: Option<V> = None;
3622
3623 assert!(path.starts_with(self.global_id().path()));
3624
3625 self.map_node_relative(
3626 path.strip_prefix(self.global_id().path()).unwrap(),
3627 &mut |node| result = Some(f(node)),
3628 );
3629 result.unwrap()
3630 }
3631
3632 fn map_node_mut<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3633 let path = id.path();
3634 let mut result: Option<V> = None;
3635
3636 assert!(path.starts_with(self.global_id().path()));
3637
3638 self.map_node_mut_relative(
3639 path.strip_prefix(self.global_id().path()).unwrap(),
3640 &mut |node| result = Some(f(node)),
3641 );
3642 result.unwrap()
3643 }
3644
3645 fn map_local_node_mut<V>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3646 let mut result: Option<V> = None;
3647
3648 self.map_node_mut_relative(&[id], &mut |node| result = Some(f(node)));
3649 result.unwrap()
3650 }
3651
3652 fn ptr_eq(this: &Self, other: &Self) -> bool {
3653 Rc::ptr_eq(&this.inner, &other.inner)
3654 }
3655
3656 fn circuit_event_handlers(&self) -> CircuitEventHandlers {
3657 self.inner().circuit_event_handlers.clone()
3658 }
3659
3660 fn scheduler_event_handlers(&self) -> SchedulerEventHandlers {
3661 self.inner().scheduler_event_handlers.clone()
3662 }
3663
3664 fn log_circuit_event(&self, event: &CircuitEvent) {
3665 self.inner().log_circuit_event(event);
3666 }
3667
3668 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3669 self.inner().log_scheduler_event(event);
3670 }
3671
3672 fn cache_get_or_insert_with<K, F>(&self, key: K, mut f: F) -> RefMut<'_, K::Value>
3673 where
3674 K: 'static + TypedMapKey<CircuitStoreMarker>,
3675 F: FnMut() -> K::Value,
3676 {
3677 if self.inner().store.borrow().contains_key(&key) {
3680 return RefMut::map(self.inner().store.borrow_mut(), |store| {
3681 store.get_mut(&key).unwrap()
3682 });
3683 }
3684
3685 let new = f();
3686
3687 RefMut::map(self.inner().store.borrow_mut(), |store| {
3690 store.entry(key).or_insert(new)
3691 })
3692 }
3693
3694 fn connect_stream<V: 'static>(
3695 &self,
3696 stream: &Stream<Self, V>,
3697 to: NodeId,
3698 ownership_preference: OwnershipPreference,
3699 ) {
3700 self.log_circuit_event(&CircuitEvent::stream(
3701 stream.origin_node_id().clone(),
3702 self.global_node_id().child(to),
3703 ownership_preference,
3704 ));
3705
3706 debug_assert_eq!(self.global_node_id(), stream.circuit.global_node_id());
3707 self.inner().add_edge(Edge {
3708 from: stream.local_node_id(),
3709 to,
3710 origin: stream.origin_node_id().clone(),
3711 stream: Some(Box::new(stream.clone())),
3712 ownership_preference: Some(ownership_preference),
3713 });
3714 }
3715
3716 fn tick(&self) {
3717 let mut time = self.time.borrow_mut();
3718 *time = time.advance(0);
3719 }
3720
3721 fn clock_start(&self, scope: Scope) {
3722 for node in self.inner().nodes.borrow_mut().iter_mut() {
3723 node.borrow_mut().clock_start(scope);
3724 }
3725 }
3726
3727 fn clock_end(&self, scope: Scope) {
3728 for node in self.inner().nodes.borrow_mut().iter_mut() {
3729 node.borrow_mut().clock_end(scope);
3730 }
3731
3732 let mut time = self.time.borrow_mut();
3733 *time = time.advance(scope + 1);
3734 }
3735
3736 fn ready(&self, id: NodeId) -> bool {
3737 self.inner().nodes.borrow()[id.0].borrow().ready()
3738 }
3739
3740 fn cache_insert<K>(&self, key: K, val: K::Value)
3741 where
3742 K: TypedMapKey<CircuitStoreMarker> + 'static,
3743 {
3744 self.inner().store.borrow_mut().insert(key, val);
3745 }
3746
3747 fn cache_contains<K>(&self, key: &K) -> bool
3748 where
3749 K: TypedMapKey<CircuitStoreMarker> + 'static,
3750 {
3751 self.inner().store.borrow().contains_key(key)
3752 }
3753
3754 fn cache_get<K>(&self, key: &K) -> Option<K::Value>
3755 where
3756 K: TypedMapKey<CircuitStoreMarker> + 'static,
3757 K::Value: Clone,
3758 {
3759 self.inner().store.borrow().get(key).cloned()
3760 }
3761
3762 fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>) {
3763 self.inner().nodes.borrow()[id.0]
3764 .borrow_mut()
3765 .register_ready_callback(cb);
3766 }
3767
3768 fn is_async_node(&self, id: NodeId) -> bool {
3769 self.inner().nodes.borrow()[id.0].borrow().is_async()
3770 }
3771
3772 #[allow(clippy::await_holding_refcell_ref)]
3774 async fn eval_node(&self, id: NodeId) -> Result<Option<Position>, SchedulerError> {
3775 let circuit = self.inner();
3776 debug_assert!(id.0 < circuit.nodes.borrow().len());
3777
3778 circuit.log_scheduler_event(&SchedulerEvent::eval_start(
3783 circuit.nodes.borrow()[id.0].borrow().as_ref(),
3784 ));
3785
3786 let span = Span::new("eval")
3787 .with_category("Operator")
3788 .with_tooltip(|| {
3789 let nodes = circuit.nodes.borrow();
3790 let node = nodes[id.0].borrow();
3791 format!("{} {}", node.name(), node.global_id().node_identifier())
3792 });
3793 let (result, duration) = Timed::new(circuit.nodes.borrow()[id.0].borrow_mut().eval()).await;
3794 let progress = result?;
3795 span.record();
3796
3797 circuit.log_scheduler_event(&SchedulerEvent::eval_end(
3798 circuit.nodes.borrow()[id.0].borrow().as_ref(),
3799 duration,
3800 ));
3801
3802 Ok(progress)
3803 }
3804
3805 #[allow(clippy::await_holding_refcell_ref)]
3807 async fn eval_import_node(&self, id: NodeId) {
3808 let circuit = self.inner();
3809 debug_assert!(id.0 < circuit.nodes.borrow().len());
3810 debug_assert!(circuit.import_nodes().contains(&id));
3811
3812 circuit.nodes.borrow()[id.0].borrow_mut().import().await;
3819 }
3820
3821 fn flush_node(&self, id: NodeId) {
3822 let circuit = self.inner();
3823 debug_assert!(id.0 < circuit.nodes.borrow().len());
3824
3825 circuit.nodes.borrow()[id.0].borrow_mut().flush();
3826 }
3827
3828 fn is_flush_complete(&self, id: NodeId) -> bool {
3829 let circuit = self.inner();
3830 debug_assert!(id.0 < circuit.nodes.borrow().len());
3831
3832 circuit.nodes.borrow()[id.0].borrow().is_flush_complete()
3833 }
3834
3835 #[track_caller]
3836 fn region<F, V>(&self, name: &str, f: F) -> V
3837 where
3838 F: FnOnce() -> V,
3839 {
3840 self.log_circuit_event(&CircuitEvent::push_region(name, Some(Location::caller())));
3841 let res = f();
3842 self.log_circuit_event(&CircuitEvent::pop_region());
3843 res
3844 }
3845
3846 #[track_caller]
3847 fn open_region(&self, name: RegionName) {
3848 self.log_circuit_event(&CircuitEvent::open_region(name, Some(Location::caller())));
3849 }
3850
3851 fn close_region(&self, name: RegionName) {
3852 self.log_circuit_event(&CircuitEvent::close_region(name));
3853 }
3854
3855 fn add_preprocessor(&self, preprocessor_node_id: NodeId) {
3856 for node in self.inner().nodes.borrow_mut().iter() {
3857 if node.borrow().is_input() {
3858 self.add_dependency(preprocessor_node_id, node.borrow().local_id());
3859 }
3860 }
3861 }
3862
3863 fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
3865 where
3866 O: Data,
3867 Op: SourceOperator<O>,
3868 {
3869 self.add_node(|id| {
3870 self.log_circuit_event(&CircuitEvent::operator(
3871 GlobalNodeId::child_of(self, id),
3872 operator.name(),
3873 operator.location(),
3874 ));
3875
3876 let node = SourceNode::new(operator, self.clone(), id);
3877 let output_stream = node.output_stream();
3878 (node, output_stream)
3879 })
3880 }
3881
3882 fn add_exchange<I, SndOp, O, RcvOp>(
3883 &self,
3884 sender: SndOp,
3885 receiver: RcvOp,
3886 input_stream: &Stream<Self, I>,
3887 ) -> Stream<Self, O>
3888 where
3889 I: Data,
3890 O: Data,
3891 SndOp: SinkOperator<I>,
3892 RcvOp: SourceOperator<O>,
3893 {
3894 let preference = sender.input_preference();
3895 self.add_exchange_with_preference(sender, receiver, input_stream, preference)
3896 }
3897
3898 fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
3899 &self,
3900 sender: SndOp,
3901 receiver: RcvOp,
3902 input_stream: &Stream<Self, I>,
3903 input_preference: OwnershipPreference,
3904 ) -> Stream<Self, O>
3905 where
3906 I: Data,
3907 O: Data,
3908 SndOp: SinkOperator<I>,
3909 RcvOp: SourceOperator<O>,
3910 {
3911 let sender_id = self.add_node(|id| {
3912 self.log_circuit_event(&CircuitEvent::operator(
3913 GlobalNodeId::child_of(self, id),
3914 sender.name(),
3915 sender.location(),
3916 ));
3917
3918 let node = SinkNode::new(sender, input_stream.clone(), self.clone(), id);
3919 self.connect_stream(input_stream, id, input_preference);
3920 (node, id)
3921 });
3922
3923 let output_stream = self.add_node(|id| {
3924 self.log_circuit_event(&CircuitEvent::operator(
3925 GlobalNodeId::child_of(self, id),
3926 receiver.name(),
3927 receiver.location(),
3928 ));
3929
3930 let node = SourceNode::new(receiver, self.clone(), id);
3931 let output_stream = node.output_stream();
3932 (node, output_stream)
3933 });
3934
3935 self.add_dependency(sender_id, output_stream.local_node_id());
3936 output_stream
3937 }
3938
3939 fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
3940 where
3941 I: Data,
3942 Op: SinkOperator<I>,
3943 {
3944 let preference = operator.input_preference();
3945 self.add_sink_with_preference(operator, input_stream, preference)
3946 }
3947
3948 fn add_sink_with_preference<I, Op>(
3949 &self,
3950 operator: Op,
3951 input_stream: &Stream<Self, I>,
3952 input_preference: OwnershipPreference,
3953 ) -> GlobalNodeId
3954 where
3955 I: Data,
3956 Op: SinkOperator<I>,
3957 {
3958 self.add_node(|id| {
3959 let global_node_id = GlobalNodeId::child_of(self, id);
3960 self.log_circuit_event(&CircuitEvent::operator(
3963 global_node_id.clone(),
3964 operator.name(),
3965 operator.location(),
3966 ));
3967
3968 self.connect_stream(input_stream, id, input_preference);
3969 (
3970 SinkNode::new(operator, input_stream.clone(), self.clone(), id),
3971 global_node_id,
3972 )
3973 })
3974 }
3975
3976 fn add_binary_sink<I1, I2, Op>(
3978 &self,
3979 operator: Op,
3980 input_stream1: &Stream<Self, I1>,
3981 input_stream2: &Stream<Self, I2>,
3982 ) where
3983 I1: Data,
3984 I2: Data,
3985 Op: BinarySinkOperator<I1, I2>,
3986 {
3987 let (preference1, preference2) = operator.input_preference();
3988 self.add_binary_sink_with_preference(
3989 operator,
3990 (input_stream1, preference1),
3991 (input_stream2, preference2),
3992 )
3993 }
3994
3995 fn add_binary_sink_with_preference<I1, I2, Op>(
3996 &self,
3997 operator: Op,
3998 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3999 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4000 ) where
4001 I1: Data,
4002 I2: Data,
4003 Op: BinarySinkOperator<I1, I2>,
4004 {
4005 let (input_stream1, input_preference1) = input_stream1;
4006 let (input_stream2, input_preference2) = input_stream2;
4007
4008 self.add_node(|id| {
4009 self.log_circuit_event(&CircuitEvent::operator(
4010 GlobalNodeId::child_of(self, id),
4011 operator.name(),
4012 operator.location(),
4013 ));
4014
4015 let node = BinarySinkNode::new(
4016 operator,
4017 input_stream1.clone(),
4018 input_stream2.clone(),
4019 self.clone(),
4020 id,
4021 );
4022 self.connect_stream(input_stream1, id, input_preference1);
4023 self.connect_stream(input_stream2, id, input_preference2);
4024 (node, ())
4025 });
4026 }
4027
4028 fn add_ternary_sink<I1, I2, I3, Op>(
4030 &self,
4031 operator: Op,
4032 input_stream1: &Stream<Self, I1>,
4033 input_stream2: &Stream<Self, I2>,
4034 input_stream3: &Stream<Self, I3>,
4035 ) -> GlobalNodeId
4036 where
4037 I1: Data,
4038 I2: Data,
4039 I3: Data,
4040 Op: TernarySinkOperator<I1, I2, I3>,
4041 {
4042 let (preference1, preference2, preference3) = operator.input_preference();
4043 self.add_ternary_sink_with_preference(
4044 operator,
4045 (input_stream1, preference1),
4046 (input_stream2, preference2),
4047 (input_stream3, preference3),
4048 )
4049 }
4050
4051 fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
4052 &self,
4053 operator: Op,
4054 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4055 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4056 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4057 ) -> GlobalNodeId
4058 where
4059 I1: Data,
4060 I2: Data,
4061 I3: Data,
4062 Op: TernarySinkOperator<I1, I2, I3>,
4063 {
4064 let (input_stream1, input_preference1) = input_stream1;
4065 let (input_stream2, input_preference2) = input_stream2;
4066 let (input_stream3, input_preference3) = input_stream3;
4067
4068 self.add_node(|id| {
4069 let global_node_id = GlobalNodeId::child_of(self, id);
4070
4071 self.log_circuit_event(&CircuitEvent::operator(
4072 GlobalNodeId::child_of(self, id),
4073 operator.name(),
4074 operator.location(),
4075 ));
4076
4077 let node = TernarySinkNode::new(
4078 operator,
4079 input_stream1.clone(),
4080 input_stream2.clone(),
4081 input_stream3.clone(),
4082 self.clone(),
4083 id,
4084 );
4085 self.connect_stream(input_stream1, id, input_preference1);
4086 self.connect_stream(input_stream2, id, input_preference2);
4087 self.connect_stream(input_stream3, id, input_preference3);
4088 (node, global_node_id)
4089 })
4090 }
4091
4092 fn add_unary_operator<I, O, Op>(
4093 &self,
4094 operator: Op,
4095 input_stream: &Stream<Self, I>,
4096 ) -> Stream<Self, O>
4097 where
4098 I: Data,
4099 O: Data,
4100 Op: UnaryOperator<I, O>,
4101 {
4102 let preference = operator.input_preference();
4103 self.add_unary_operator_with_preference(operator, input_stream, preference)
4104 }
4105
4106 fn add_unary_operator_with_preference<I, O, Op>(
4107 &self,
4108 operator: Op,
4109 input_stream: &Stream<Self, I>,
4110 input_preference: OwnershipPreference,
4111 ) -> Stream<Self, O>
4112 where
4113 I: Data,
4114 O: Data,
4115 Op: UnaryOperator<I, O>,
4116 {
4117 self.add_node(|id| {
4118 self.log_circuit_event(&CircuitEvent::operator(
4119 GlobalNodeId::child_of(self, id),
4120 operator.name(),
4121 operator.location(),
4122 ));
4123
4124 let node = UnaryNode::new(operator, input_stream.clone(), self.clone(), id);
4125 let output_stream = node.output_stream();
4126 self.connect_stream(input_stream, id, input_preference);
4127 (node, output_stream)
4128 })
4129 }
4130
4131 fn add_binary_operator<I1, I2, O, Op>(
4132 &self,
4133 operator: Op,
4134 input_stream1: &Stream<Self, I1>,
4135 input_stream2: &Stream<Self, I2>,
4136 ) -> Stream<Self, O>
4137 where
4138 I1: Data,
4139 I2: Data,
4140 O: Data,
4141 Op: BinaryOperator<I1, I2, O>,
4142 {
4143 let (pref1, pref2) = operator.input_preference();
4144 self.add_binary_operator_with_preference(
4145 operator,
4146 (input_stream1, pref1),
4147 (input_stream2, pref2),
4148 )
4149 }
4150
4151 fn add_binary_operator_with_preference<I1, I2, O, Op>(
4152 &self,
4153 operator: Op,
4154 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4155 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4156 ) -> Stream<Self, O>
4157 where
4158 I1: Data,
4159 I2: Data,
4160 O: Data,
4161 Op: BinaryOperator<I1, I2, O>,
4162 {
4163 let (input_stream1, input_preference1) = input_stream1;
4164 let (input_stream2, input_preference2) = input_stream2;
4165
4166 self.add_node(|id| {
4167 self.log_circuit_event(&CircuitEvent::operator(
4168 GlobalNodeId::child_of(self, id),
4169 operator.name(),
4170 operator.location(),
4171 ));
4172
4173 let node = BinaryNode::new(
4174 operator,
4175 input_stream1.clone(),
4176 input_stream2.clone(),
4177 self.clone(),
4178 id,
4179 );
4180 let output_stream = node.output_stream();
4181 self.connect_stream(input_stream1, id, input_preference1);
4182 self.connect_stream(input_stream2, id, input_preference2);
4183 (node, output_stream)
4184 })
4185 }
4186
4187 fn add_ternary_operator<I1, I2, I3, O, Op>(
4188 &self,
4189 operator: Op,
4190 input_stream1: &Stream<Self, I1>,
4191 input_stream2: &Stream<Self, I2>,
4192 input_stream3: &Stream<Self, I3>,
4193 ) -> Stream<Self, O>
4194 where
4195 I1: Data,
4196 I2: Data,
4197 I3: Data,
4198 O: Data,
4199 Op: TernaryOperator<I1, I2, I3, O>,
4200 {
4201 let (pref1, pref2, pref3) = operator.input_preference();
4202 self.add_ternary_operator_with_preference(
4203 operator,
4204 (input_stream1, pref1),
4205 (input_stream2, pref2),
4206 (input_stream3, pref3),
4207 )
4208 }
4209
4210 #[allow(clippy::too_many_arguments)]
4211 fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
4212 &self,
4213 operator: Op,
4214 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4215 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4216 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4217 ) -> Stream<Self, O>
4218 where
4219 I1: Data,
4220 I2: Data,
4221 I3: Data,
4222 O: Data,
4223 Op: TernaryOperator<I1, I2, I3, O>,
4224 {
4225 let (input_stream1, input_preference1) = input_stream1;
4226 let (input_stream2, input_preference2) = input_stream2;
4227 let (input_stream3, input_preference3) = input_stream3;
4228
4229 self.add_node(|id| {
4230 self.log_circuit_event(&CircuitEvent::operator(
4231 GlobalNodeId::child_of(self, id),
4232 operator.name(),
4233 operator.location(),
4234 ));
4235
4236 let node = TernaryNode::new(
4237 operator,
4238 input_stream1.clone(),
4239 input_stream2.clone(),
4240 input_stream3.clone(),
4241 self.clone(),
4242 id,
4243 );
4244 let output_stream = node.output_stream();
4245 self.connect_stream(input_stream1, id, input_preference1);
4246 self.connect_stream(input_stream2, id, input_preference2);
4247 self.connect_stream(input_stream3, id, input_preference3);
4248 (node, output_stream)
4249 })
4250 }
4251
4252 fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
4253 &self,
4254 operator: Op,
4255 input_stream1: &Stream<Self, I1>,
4256 input_stream2: &Stream<Self, I2>,
4257 input_stream3: &Stream<Self, I3>,
4258 input_stream4: &Stream<Self, I4>,
4259 ) -> Stream<Self, O>
4260 where
4261 I1: Data,
4262 I2: Data,
4263 I3: Data,
4264 I4: Data,
4265 O: Data,
4266 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4267 {
4268 let (pref1, pref2, pref3, pref4) = operator.input_preference();
4269 self.add_quaternary_operator_with_preference(
4270 operator,
4271 (input_stream1, pref1),
4272 (input_stream2, pref2),
4273 (input_stream3, pref3),
4274 (input_stream4, pref4),
4275 )
4276 }
4277
4278 #[allow(clippy::too_many_arguments)]
4279 fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
4280 &self,
4281 operator: Op,
4282 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4283 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4284 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4285 input_stream4: (&Stream<Self, I4>, OwnershipPreference),
4286 ) -> Stream<Self, O>
4287 where
4288 I1: Data,
4289 I2: Data,
4290 I3: Data,
4291 I4: Data,
4292 O: Data,
4293 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4294 {
4295 let (input_stream1, input_preference1) = input_stream1;
4296 let (input_stream2, input_preference2) = input_stream2;
4297 let (input_stream3, input_preference3) = input_stream3;
4298 let (input_stream4, input_preference4) = input_stream4;
4299
4300 self.add_node(|id| {
4301 self.log_circuit_event(&CircuitEvent::operator(
4302 GlobalNodeId::child_of(self, id),
4303 operator.name(),
4304 operator.location(),
4305 ));
4306
4307 let node = QuaternaryNode::new(
4308 operator,
4309 input_stream1.clone(),
4310 input_stream2.clone(),
4311 input_stream3.clone(),
4312 input_stream4.clone(),
4313 self.clone(),
4314 id,
4315 );
4316 let output_stream = node.output_stream();
4317 self.connect_stream(input_stream1, id, input_preference1);
4318 self.connect_stream(input_stream2, id, input_preference2);
4319 self.connect_stream(input_stream3, id, input_preference3);
4320 self.connect_stream(input_stream4, id, input_preference4);
4321 (node, output_stream)
4322 })
4323 }
4324
4325 fn add_nary_operator<'a, I, O, Op, Iter>(
4326 &'a self,
4327 operator: Op,
4328 input_streams: Iter,
4329 ) -> Stream<Self, O>
4330 where
4331 I: Data,
4332 O: Data,
4333 Op: NaryOperator<I, O>,
4334 Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4335 {
4336 let pref = operator.input_preference();
4337 self.add_nary_operator_with_preference(operator, input_streams, pref)
4338 }
4339
4340 fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
4341 &'a self,
4342 operator: Op,
4343 input_streams: Iter,
4344 input_preference: OwnershipPreference,
4345 ) -> Stream<Self, O>
4346 where
4347 I: Data,
4348 O: Data,
4349 Op: NaryOperator<I, O>,
4350 Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4351 {
4352 let input_streams: Vec<Stream<_, _>> = input_streams.into_iter().cloned().collect();
4353 self.add_node(|id| {
4354 self.log_circuit_event(&CircuitEvent::operator(
4355 GlobalNodeId::child_of(self, id),
4356 operator.name(),
4357 operator.location(),
4358 ));
4359
4360 let node = NaryNode::new(operator, input_streams.clone(), self.clone(), id);
4361 let output_stream = node.output_stream();
4362 for stream in input_streams.iter() {
4363 self.connect_stream(stream, id, input_preference);
4364 }
4365 (node, output_stream)
4366 })
4367 }
4368
4369 #[track_caller]
4370 fn add_custom_node<N: Node, R>(
4371 &self,
4372 name: Cow<'static, str>,
4373 constructor: impl FnOnce(NodeId) -> (N, R),
4374 ) -> R {
4375 self.add_node(|id| {
4376 self.log_circuit_event(&CircuitEvent::operator(
4379 GlobalNodeId::child_of(self, id),
4380 name,
4381 Some(Location::caller()),
4382 ));
4383 let (node, res) = constructor(id);
4384 (node, res)
4385 })
4386 }
4387
4388 fn add_feedback<I, O, Op>(
4389 &self,
4390 operator: Op,
4391 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4392 where
4393 I: Data,
4394 O: Data,
4395 Op: StrictUnaryOperator<I, O>,
4396 {
4397 self.add_node(|id| {
4398 self.log_circuit_event(&CircuitEvent::strict_operator_output(
4399 GlobalNodeId::child_of(self, id),
4400 operator.name(),
4401 operator.location(),
4402 ));
4403
4404 let operator = Rc::new(RefCell::new(operator));
4405 let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4406 let output_node = FeedbackOutputNode::new(operator, self.clone(), id);
4407 let local = output_node.output_stream();
4408 (output_node, (local, connector))
4409 })
4410 }
4411
4412 fn add_feedback_with_export<I, O, Op>(
4413 &self,
4414 operator: Op,
4415 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4416 where
4417 I: Data,
4418 O: Data,
4419 Op: StrictUnaryOperator<I, O>,
4420 {
4421 self.add_node(|id| {
4422 self.log_circuit_event(&CircuitEvent::strict_operator_output(
4423 GlobalNodeId::child_of(self, id),
4424 operator.name(),
4425 operator.location(),
4426 ));
4427
4428 let operator = Rc::new(RefCell::new(operator));
4429 let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4430 let output_node = FeedbackOutputNode::with_export(operator, self.clone(), id);
4431 let local = output_node.output_stream();
4432 let export = output_node.export_stream.clone().unwrap();
4433 (output_node, (ExportStream { local, export }, connector))
4434 })
4435 }
4436
4437 fn connect_feedback_with_preference<I, O, Op>(
4441 &self,
4442 output_node_id: NodeId,
4443 operator: Rc<RefCell<Op>>,
4444 input_stream: &Stream<Self, I>,
4445 input_preference: OwnershipPreference,
4446 ) where
4447 I: Data,
4448 O: Data,
4449 Op: StrictUnaryOperator<I, O>,
4450 {
4451 self.add_node(|id| {
4452 self.log_circuit_event(&CircuitEvent::strict_operator_input(
4453 GlobalNodeId::child_of(self, id),
4454 output_node_id,
4455 ));
4456
4457 let output_node = FeedbackInputNode::new(operator, input_stream.clone(), id);
4458 self.connect_stream(input_stream, id, input_preference);
4459 self.add_dependency(output_node_id, id);
4460 (output_node, ())
4461 })
4462 }
4463
4464 fn iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4465 where
4466 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4467 E: Executor<IterativeCircuit<Self>>,
4468 {
4469 self.try_add_node(|id| {
4470 let global_id = GlobalNodeId::child_of(self, id);
4471 self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), true));
4472 let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4473 let (res, executor) = child_constructor(&mut child_circuit)?;
4474 let child = <ChildNode<IterativeCircuit<Self>>>::new::<E>(child_circuit, 1, executor);
4475 self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4476 Ok((child, res))
4477 })
4478 }
4479
4480 fn non_iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4481 where
4482 F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4483 E: Executor<NonIterativeCircuit<Self>>,
4484 {
4485 self.try_add_node(|id| {
4486 let global_id = GlobalNodeId::child_of(self, id);
4487 self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), false));
4488 let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4489 let (res, executor) = child_constructor(&mut child_circuit)?;
4490 let child =
4491 <ChildNode<NonIterativeCircuit<Self>>>::new::<E>(child_circuit, 0, executor);
4492 self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4493 Ok((child, res))
4494 })
4495 }
4496
4497 fn iterate<F, C, V>(&self, constructor: F) -> Result<V, SchedulerError>
4498 where
4499 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4500 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4501 {
4502 self.iterate_with_scheduler::<F, C, V, DynamicScheduler>(constructor)
4503 }
4504
4505 fn iterate_with_scheduler<F, C, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4510 where
4511 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4512 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4513 S: Scheduler + 'static,
4514 {
4515 self.iterative_subcircuit(|child| {
4516 let (termination_check, res) = constructor(child)?;
4517 let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4518 executor.prepare(child, None)?;
4519 Ok((res, executor))
4520 })
4521 }
4522
4523 fn fixedpoint<F, V>(&self, constructor: F) -> Result<V, SchedulerError>
4524 where
4525 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4526 {
4527 self.fixedpoint_with_scheduler::<F, V, DynamicScheduler>(constructor)
4528 }
4529
4530 fn fixedpoint_with_scheduler<F, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4531 where
4532 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4533 S: Scheduler + 'static,
4534 {
4535 self.iterative_subcircuit(|child| {
4536 let res = constructor(child)?;
4537 let child_clone = child.clone();
4538
4539 let consensus = Consensus::new();
4540
4541 let termination_check = async move || {
4542 let local_fixedpoint = child_clone.inner().check_fixedpoint(0);
4544 consensus.check(local_fixedpoint).await
4545 };
4546 let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4547 executor.prepare(child, None)?;
4548 Ok((res, executor))
4549 })
4550 }
4551
4552 fn import_stream<I, O, Op>(&self, operator: Op, parent_stream: &Stream<P, I>) -> Stream<Self, O>
4553 where
4554 Self::Parent: Circuit,
4555 I: Data,
4556 O: Data,
4557 Op: ImportOperator<I, O>,
4558 {
4559 let preference = operator.input_preference();
4560 self.import_stream_with_preference(operator, parent_stream, preference)
4561 }
4562
4563 fn import_stream_with_preference<I, O, Op>(
4564 &self,
4565 operator: Op,
4566 parent_stream: &Stream<P, I>,
4567 input_preference: OwnershipPreference,
4568 ) -> Stream<Self, O>
4569 where
4570 Self::Parent: Circuit,
4571 I: Data,
4572 O: Data,
4573 Op: ImportOperator<I, O>,
4574 {
4575 assert!(self.is_child_of(parent_stream.circuit()));
4576
4577 let output_stream = self.add_node(|id| {
4578 let node_id = self.global_node_id().child(id);
4579 self.log_circuit_event(&CircuitEvent::operator(
4580 node_id.clone(),
4581 operator.name(),
4582 operator.location(),
4583 ));
4584 let node = ImportNode::new(operator, self.clone(), parent_stream.clone(), id);
4585 self.parent()
4587 .connect_stream(parent_stream, self.node_id(), input_preference);
4588 self.parent().log_circuit_event(&CircuitEvent::stream(
4590 parent_stream.origin_node_id().clone(),
4591 node_id.clone(),
4592 input_preference,
4593 ));
4594 let output_stream = node.output_stream();
4595 (node, output_stream)
4596 });
4597
4598 self.add_import_node(output_stream.local_node_id());
4599
4600 output_stream
4601 }
4602
4603 fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata) {
4604 let mut edges = self.edges_mut();
4605 let mut new_edges = Vec::new();
4606
4607 let Some(edges_to_replay) = edges.get_by_stream_id(&Some(stream_id)) else {
4608 return;
4609 };
4610
4611 for edge in edges_to_replay {
4612 new_edges.push(Edge {
4619 from: replay_stream.local_node_id(),
4620 to: edge.to,
4621 origin: replay_stream.origin_node_id().clone(),
4622 stream: Some(clone_box(replay_stream)),
4623 ownership_preference: edge.ownership_preference,
4624 });
4625 }
4626
4627 edges.extend(new_edges);
4628 }
4629}
4630struct ImportNode<C, I, O, Op>
4631where
4632 C: Circuit,
4633{
4634 id: GlobalNodeId,
4635 operator: Op,
4636 parent_stream: Stream<C::Parent, I>,
4637 output_stream: Stream<C, O>,
4638 labels: BTreeMap<String, String>,
4639}
4640
4641impl<C, I, O, Op> ImportNode<C, I, O, Op>
4642where
4643 C: Circuit,
4644 C::Parent: Circuit,
4645 I: Clone + 'static,
4646 O: Clone + 'static,
4647 Op: ImportOperator<I, O>,
4648{
4649 fn new(operator: Op, circuit: C, parent_stream: Stream<C::Parent, I>, id: NodeId) -> Self {
4650 assert!(Circuit::ptr_eq(&circuit.parent(), parent_stream.circuit()));
4651
4652 Self {
4653 id: circuit.global_node_id().child(id),
4654 operator,
4655 parent_stream,
4656 output_stream: Stream::new(circuit, id),
4657 labels: BTreeMap::new(),
4658 }
4659 }
4660
4661 fn output_stream(&self) -> Stream<C, O> {
4662 self.output_stream.clone()
4663 }
4664}
4665
4666impl<C, I, O, Op> Node for ImportNode<C, I, O, Op>
4667where
4668 C: Circuit,
4669 C::Parent: Circuit,
4670 I: Clone + 'static,
4671 O: Clone + 'static,
4672 Op: ImportOperator<I, O>,
4673{
4674 fn name(&self) -> Cow<'static, str> {
4675 self.operator.name()
4676 }
4677
4678 fn local_id(&self) -> NodeId {
4679 self.id.local_node_id().unwrap()
4680 }
4681
4682 fn global_id(&self) -> &GlobalNodeId {
4683 &self.id
4684 }
4685
4686 fn is_async(&self) -> bool {
4687 self.operator.is_async()
4688 }
4689
4690 fn is_input(&self) -> bool {
4691 self.operator.is_input()
4692 }
4693
4694 fn ready(&self) -> bool {
4695 self.operator.ready()
4696 }
4697
4698 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4699 self.operator.register_ready_callback(cb);
4700 }
4701
4702 fn eval<'a>(
4703 &'a mut self,
4704 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4705 Box::pin(async {
4706 self.output_stream.put(self.operator.eval().await);
4707 Ok(self.operator.flush_progress())
4708 })
4709 }
4710
4711 #[allow(clippy::await_holding_refcell_ref)]
4714 fn import<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
4715 Box::pin(async {
4716 match StreamValue::take(self.parent_stream.val()) {
4717 None => {
4718 self.operator
4719 .import(StreamValue::peek(&self.parent_stream.get()))
4720 .await
4721 }
4722 Some(val) => self.operator.import_owned(val).await,
4723 }
4724
4725 StreamValue::consume_token(self.parent_stream.val());
4726 })
4727 }
4728
4729 fn start_transaction(&mut self) {
4730 self.operator.start_transaction();
4731 }
4732
4733 fn flush(&mut self) {
4734 self.operator.flush();
4735 }
4736
4737 fn is_flush_complete(&self) -> bool {
4738 self.operator.is_flush_complete()
4739 }
4740
4741 fn clock_start(&mut self, scope: Scope) {
4742 self.operator.clock_start(scope);
4743 }
4744
4745 fn clock_end(&mut self, scope: Scope) {
4746 self.operator.clock_end(scope);
4747 }
4748
4749 fn init(&mut self) {
4750 self.operator.init(&self.id);
4751 }
4752
4753 fn metadata(&self, output: &mut OperatorMeta) {
4754 self.operator.metadata(output);
4755 }
4756
4757 fn fixedpoint(&self, scope: Scope) -> bool {
4758 self.operator.fixedpoint(scope)
4759 }
4760
4761 fn checkpoint(
4762 &mut self,
4763 base: &StoragePath,
4764 files: &mut Vec<Arc<dyn FileCommitter>>,
4765 ) -> Result<(), DbspError> {
4766 self.operator
4767 .checkpoint(base, self.persistent_id().as_deref(), files)
4768 }
4769
4770 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4771 self.operator.restore(base, self.persistent_id().as_deref())
4772 }
4773
4774 fn start_compaction(&mut self) {
4775 self.operator.start_compaction()
4776 }
4777
4778 fn clear_state(&mut self) -> Result<(), DbspError> {
4779 self.operator.clear_state()
4780 }
4781
4782 fn start_replay(&mut self) -> Result<(), DbspError> {
4783 self.operator.start_replay()
4784 }
4785
4786 fn end_replay(&mut self) -> Result<(), DbspError> {
4787 self.operator.end_replay()
4788 }
4789
4790 fn is_replay_complete(&self) -> bool {
4791 self.operator.is_replay_complete()
4792 }
4793
4794 fn set_label(&mut self, key: &str, value: &str) {
4795 self.labels.insert(key.to_string(), value.to_string());
4796 }
4797
4798 fn get_label(&self, key: &str) -> Option<&str> {
4799 self.labels.get(key).map(|s| s.as_str())
4800 }
4801
4802 fn labels(&self) -> &BTreeMap<String, String> {
4803 &self.labels
4804 }
4805
4806 fn as_any(&self) -> &dyn Any {
4807 self
4808 }
4809}
4810
4811struct SourceNode<C, O, Op> {
4812 id: GlobalNodeId,
4813 operator: Op,
4814 output_stream: Stream<C, O>,
4815 labels: BTreeMap<String, String>,
4816}
4817
4818impl<C, O, Op> SourceNode<C, O, Op>
4819where
4820 Op: SourceOperator<O>,
4821 C: Circuit,
4822{
4823 fn new(operator: Op, circuit: C, id: NodeId) -> Self {
4824 Self {
4825 id: circuit.global_node_id().child(id),
4826 operator,
4827 output_stream: Stream::new(circuit, id),
4828 labels: BTreeMap::new(),
4829 }
4830 }
4831
4832 fn output_stream(&self) -> Stream<C, O> {
4833 self.output_stream.clone()
4834 }
4835}
4836
4837impl<C, O, Op> Node for SourceNode<C, O, Op>
4838where
4839 C: Circuit,
4840 O: Clone + 'static,
4841 Op: SourceOperator<O>,
4842{
4843 fn name(&self) -> Cow<'static, str> {
4844 self.operator.name()
4845 }
4846
4847 fn local_id(&self) -> NodeId {
4848 self.id.local_node_id().unwrap()
4849 }
4850
4851 fn global_id(&self) -> &GlobalNodeId {
4852 &self.id
4853 }
4854
4855 fn is_async(&self) -> bool {
4856 self.operator.is_async()
4857 }
4858
4859 fn is_input(&self) -> bool {
4860 self.operator.is_input()
4861 }
4862
4863 fn ready(&self) -> bool {
4864 self.operator.ready()
4865 }
4866
4867 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4868 self.operator.register_ready_callback(cb);
4869 }
4870
4871 fn eval<'a>(
4872 &'a mut self,
4873 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4874 Box::pin(async {
4875 self.output_stream.put(self.operator.eval().await);
4876 Ok(self.operator.flush_progress())
4877 })
4878 }
4879
4880 fn start_transaction(&mut self) {
4881 self.operator.start_transaction();
4882 }
4883
4884 fn flush(&mut self) {
4885 self.operator.flush();
4886 }
4887
4888 fn is_flush_complete(&self) -> bool {
4889 self.operator.is_flush_complete()
4890 }
4891
4892 fn clock_start(&mut self, scope: Scope) {
4893 self.operator.clock_start(scope);
4894 }
4895
4896 fn clock_end(&mut self, scope: Scope) {
4897 self.operator.clock_end(scope);
4898 }
4899
4900 fn init(&mut self) {
4901 self.operator.init(&self.id);
4902 }
4903
4904 fn metadata(&self, output: &mut OperatorMeta) {
4905 self.operator.metadata(output);
4906 }
4907
4908 fn fixedpoint(&self, scope: Scope) -> bool {
4909 self.operator.fixedpoint(scope)
4910 }
4911
4912 fn checkpoint(
4913 &mut self,
4914 base: &StoragePath,
4915 files: &mut Vec<Arc<dyn FileCommitter>>,
4916 ) -> Result<(), DbspError> {
4917 self.operator
4918 .checkpoint(base, self.persistent_id().as_deref(), files)
4919 }
4920
4921 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4922 self.operator.restore(base, self.persistent_id().as_deref())
4923 }
4924
4925 fn start_compaction(&mut self) {
4926 self.operator.start_compaction()
4927 }
4928
4929 fn clear_state(&mut self) -> Result<(), DbspError> {
4930 self.operator.clear_state()
4931 }
4932
4933 fn start_replay(&mut self) -> Result<(), DbspError> {
4934 self.operator.start_replay()
4935 }
4936
4937 fn is_replay_complete(&self) -> bool {
4938 self.operator.is_replay_complete()
4939 }
4940
4941 fn end_replay(&mut self) -> Result<(), DbspError> {
4942 self.operator.end_replay()
4943 }
4944
4945 fn set_label(&mut self, key: &str, value: &str) {
4946 self.labels.insert(key.to_string(), value.to_string());
4947 }
4948
4949 fn get_label(&self, key: &str) -> Option<&str> {
4950 self.labels.get(key).map(|s| s.as_str())
4951 }
4952
4953 fn labels(&self) -> &BTreeMap<String, String> {
4954 &self.labels
4955 }
4956
4957 fn as_any(&self) -> &dyn Any {
4958 self
4959 }
4960}
4961
4962struct UnaryNode<C, I, O, Op> {
4963 id: GlobalNodeId,
4964 operator: Op,
4965 input_stream: Stream<C, I>,
4966 output_stream: Stream<C, O>,
4967 labels: BTreeMap<String, String>,
4968}
4969
4970impl<C, I, O, Op> UnaryNode<C, I, O, Op>
4971where
4972 Op: UnaryOperator<I, O>,
4973 C: Circuit,
4974{
4975 fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4976 Self {
4977 id: circuit.global_node_id().child(id),
4978 operator,
4979 input_stream,
4980 output_stream: Stream::new(circuit, id),
4981 labels: BTreeMap::new(),
4982 }
4983 }
4984
4985 fn output_stream(&self) -> Stream<C, O> {
4986 self.output_stream.clone()
4987 }
4988}
4989
4990impl<C, I, O, Op> Node for UnaryNode<C, I, O, Op>
4991where
4992 C: Circuit,
4993 I: Clone + 'static,
4994 O: Clone + 'static,
4995 Op: UnaryOperator<I, O>,
4996{
4997 fn name(&self) -> Cow<'static, str> {
4998 self.operator.name()
4999 }
5000
5001 fn local_id(&self) -> NodeId {
5002 self.id.local_node_id().unwrap()
5003 }
5004
5005 fn global_id(&self) -> &GlobalNodeId {
5006 &self.id
5007 }
5008
5009 fn is_async(&self) -> bool {
5010 self.operator.is_async()
5011 }
5012
5013 fn is_input(&self) -> bool {
5014 self.operator.is_input()
5015 }
5016
5017 fn ready(&self) -> bool {
5018 self.operator.ready()
5019 }
5020
5021 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5022 self.operator.register_ready_callback(cb);
5023 }
5024
5025 #[allow(clippy::await_holding_refcell_ref)]
5027 fn eval<'a>(
5028 &'a mut self,
5029 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5030 Box::pin(async {
5031 self.output_stream
5032 .put(match StreamValue::take(self.input_stream.val()) {
5033 Some(v) => self.operator.eval_owned(v).await,
5034 None => {
5035 self.operator
5036 .eval(StreamValue::peek(&self.input_stream.get()))
5037 .await
5038 }
5039 });
5040 StreamValue::consume_token(self.input_stream.val());
5041 Ok(self.operator.flush_progress())
5042 })
5043 }
5044
5045 fn start_transaction(&mut self) {
5046 self.operator.start_transaction();
5047 }
5048
5049 fn flush(&mut self) {
5050 self.operator.flush();
5051 }
5052
5053 fn is_flush_complete(&self) -> bool {
5054 self.operator.is_flush_complete()
5055 }
5056
5057 fn clock_start(&mut self, scope: Scope) {
5058 self.operator.clock_start(scope);
5059 }
5060
5061 fn clock_end(&mut self, scope: Scope) {
5062 self.operator.clock_end(scope);
5063 }
5064
5065 fn init(&mut self) {
5066 self.operator.init(&self.id);
5067 }
5068
5069 fn metadata(&self, output: &mut OperatorMeta) {
5070 self.operator.metadata(output);
5071 }
5072
5073 fn fixedpoint(&self, scope: Scope) -> bool {
5074 self.operator.fixedpoint(scope)
5075 }
5076
5077 fn checkpoint(
5078 &mut self,
5079 base: &StoragePath,
5080 files: &mut Vec<Arc<dyn FileCommitter>>,
5081 ) -> Result<(), DbspError> {
5082 self.operator
5083 .checkpoint(base, self.persistent_id().as_deref(), files)
5084 }
5085
5086 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5087 self.operator.restore(base, self.persistent_id().as_deref())
5088 }
5089
5090 fn start_compaction(&mut self) {
5091 self.operator.start_compaction()
5092 }
5093
5094 fn clear_state(&mut self) -> Result<(), DbspError> {
5095 self.operator.clear_state()
5096 }
5097
5098 fn start_replay(&mut self) -> Result<(), DbspError> {
5099 self.operator.start_replay()
5100 }
5101
5102 fn is_replay_complete(&self) -> bool {
5103 self.operator.is_replay_complete()
5104 }
5105
5106 fn end_replay(&mut self) -> Result<(), DbspError> {
5107 self.operator.end_replay()
5108 }
5109
5110 fn set_label(&mut self, key: &str, value: &str) {
5111 self.labels.insert(key.to_string(), value.to_string());
5112 }
5113
5114 fn get_label(&self, key: &str) -> Option<&str> {
5115 self.labels.get(key).map(|s| s.as_str())
5116 }
5117
5118 fn labels(&self) -> &BTreeMap<String, String> {
5119 &self.labels
5120 }
5121
5122 fn as_any(&self) -> &dyn Any {
5123 self
5124 }
5125}
5126
5127struct SinkNode<C, I, Op> {
5128 id: GlobalNodeId,
5129 operator: Op,
5130 input_stream: Stream<C, I>,
5131 labels: BTreeMap<String, String>,
5132}
5133
5134impl<C, I, Op> SinkNode<C, I, Op>
5135where
5136 Op: SinkOperator<I>,
5137 C: Circuit,
5138{
5139 fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
5140 Self {
5141 id: circuit.global_node_id().child(id),
5142 operator,
5143 input_stream,
5144 labels: BTreeMap::new(),
5145 }
5146 }
5147}
5148
5149impl<C, I, Op> Node for SinkNode<C, I, Op>
5150where
5151 C: Circuit,
5152 I: Clone + 'static,
5153 Op: SinkOperator<I>,
5154{
5155 fn name(&self) -> Cow<'static, str> {
5156 self.operator.name()
5157 }
5158
5159 fn local_id(&self) -> NodeId {
5160 self.id.local_node_id().unwrap()
5161 }
5162
5163 fn global_id(&self) -> &GlobalNodeId {
5164 &self.id
5165 }
5166
5167 fn is_async(&self) -> bool {
5168 self.operator.is_async()
5169 }
5170
5171 fn is_input(&self) -> bool {
5172 self.operator.is_input()
5173 }
5174
5175 fn ready(&self) -> bool {
5176 self.operator.ready()
5177 }
5178
5179 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5180 self.operator.register_ready_callback(cb);
5181 }
5182
5183 #[allow(clippy::await_holding_refcell_ref)]
5185 fn eval<'a>(
5186 &'a mut self,
5187 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5188 Box::pin(async {
5189 match StreamValue::take(self.input_stream.val()) {
5190 Some(v) => self.operator.eval_owned(v).await,
5191 None => {
5192 self.operator
5193 .eval(StreamValue::peek(&self.input_stream.get()))
5194 .await
5195 }
5196 };
5197 StreamValue::consume_token(self.input_stream.val());
5198
5199 Ok(self.operator.flush_progress())
5200 })
5201 }
5202
5203 fn start_transaction(&mut self) {
5204 self.operator.start_transaction();
5205 }
5206
5207 fn flush(&mut self) {
5208 self.operator.flush();
5209 }
5210
5211 fn is_flush_complete(&self) -> bool {
5212 self.operator.is_flush_complete()
5213 }
5214
5215 fn clock_start(&mut self, scope: Scope) {
5216 self.operator.clock_start(scope);
5217 }
5218
5219 fn clock_end(&mut self, scope: Scope) {
5220 self.operator.clock_end(scope);
5221 }
5222
5223 fn init(&mut self) {
5224 self.operator.init(&self.id);
5225 }
5226
5227 fn metadata(&self, output: &mut OperatorMeta) {
5228 self.operator.metadata(output);
5229 }
5230
5231 fn fixedpoint(&self, scope: Scope) -> bool {
5232 self.operator.fixedpoint(scope)
5233 }
5234
5235 fn checkpoint(
5236 &mut self,
5237 base: &StoragePath,
5238 files: &mut Vec<Arc<dyn FileCommitter>>,
5239 ) -> Result<(), DbspError> {
5240 self.operator
5241 .checkpoint(base, self.persistent_id().as_deref(), files)
5242 }
5243
5244 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5245 self.operator.restore(base, self.persistent_id().as_deref())
5246 }
5247
5248 fn start_compaction(&mut self) {
5249 self.operator.start_compaction()
5250 }
5251
5252 fn clear_state(&mut self) -> Result<(), DbspError> {
5253 self.operator.clear_state()
5254 }
5255
5256 fn start_replay(&mut self) -> Result<(), DbspError> {
5257 self.operator.start_replay()
5258 }
5259
5260 fn is_replay_complete(&self) -> bool {
5261 self.operator.is_replay_complete()
5262 }
5263
5264 fn end_replay(&mut self) -> Result<(), DbspError> {
5265 self.operator.end_replay()
5266 }
5267
5268 fn set_label(&mut self, key: &str, value: &str) {
5269 self.labels.insert(key.to_string(), value.to_string());
5270 }
5271
5272 fn get_label(&self, key: &str) -> Option<&str> {
5273 self.labels.get(key).map(|s| s.as_str())
5274 }
5275
5276 fn labels(&self) -> &BTreeMap<String, String> {
5277 &self.labels
5278 }
5279
5280 fn as_any(&self) -> &dyn Any {
5281 self
5282 }
5283}
5284
5285struct BinarySinkNode<C, I1, I2, Op> {
5286 id: GlobalNodeId,
5287 operator: Op,
5288 input_stream1: Stream<C, I1>,
5289 input_stream2: Stream<C, I2>,
5290 is_alias: bool,
5292 labels: BTreeMap<String, String>,
5293}
5294
5295impl<C, I1, I2, Op> BinarySinkNode<C, I1, I2, Op>
5296where
5297 I1: Clone,
5298 I2: Clone,
5299 Op: BinarySinkOperator<I1, I2>,
5300 C: Circuit,
5301{
5302 fn new(
5303 operator: Op,
5304 input_stream1: Stream<C, I1>,
5305 input_stream2: Stream<C, I2>,
5306 circuit: C,
5307 id: NodeId,
5308 ) -> Self {
5309 let is_alias = input_stream1.ptr_eq(&input_stream2);
5310 Self {
5311 id: circuit.global_node_id().child(id),
5312 operator,
5313 input_stream1,
5314 input_stream2,
5315 is_alias,
5316 labels: BTreeMap::new(),
5317 }
5318 }
5319}
5320
5321impl<C, I1, I2, Op> Node for BinarySinkNode<C, I1, I2, Op>
5322where
5323 C: Circuit,
5324 I1: Clone + 'static,
5325 I2: Clone + 'static,
5326 Op: BinarySinkOperator<I1, I2>,
5327{
5328 fn name(&self) -> Cow<'static, str> {
5329 self.operator.name()
5330 }
5331
5332 fn local_id(&self) -> NodeId {
5333 self.id.local_node_id().unwrap()
5334 }
5335
5336 fn global_id(&self) -> &GlobalNodeId {
5337 &self.id
5338 }
5339
5340 fn is_async(&self) -> bool {
5341 self.operator.is_async()
5342 }
5343
5344 fn is_input(&self) -> bool {
5345 self.operator.is_input()
5346 }
5347
5348 fn ready(&self) -> bool {
5349 self.operator.ready()
5350 }
5351
5352 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5353 self.operator.register_ready_callback(cb);
5354 }
5355
5356 #[allow(clippy::await_holding_refcell_ref)]
5358 fn eval<'a>(
5359 &'a mut self,
5360 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5361 Box::pin(async {
5362 if self.is_alias {
5363 {
5364 let val1 = self.input_stream1.get();
5365 let val2 = self.input_stream2.get();
5366 self.operator
5367 .eval(
5368 Cow::Borrowed(StreamValue::peek(&val1)),
5369 Cow::Borrowed(StreamValue::peek(&val2)),
5370 )
5371 .await;
5372 }
5373
5374 StreamValue::consume_token(self.input_stream1.val());
5375 StreamValue::consume_token(self.input_stream2.val());
5376 } else {
5377 let val1 = StreamValue::take(self.input_stream1.val());
5378 let val2 = StreamValue::take(self.input_stream2.val());
5379
5380 match (val1, val2) {
5381 (Some(val1), Some(val2)) => {
5382 self.operator.eval(Cow::Owned(val1), Cow::Owned(val2)).await;
5383 }
5384 (Some(val1), None) => {
5385 self.operator
5386 .eval(
5387 Cow::Owned(val1),
5388 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5389 )
5390 .await;
5391 }
5392 (None, Some(val2)) => {
5393 self.operator
5394 .eval(
5395 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5396 Cow::Owned(val2),
5397 )
5398 .await;
5399 }
5400 (None, None) => {
5401 self.operator
5402 .eval(
5403 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5404 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5405 )
5406 .await;
5407 }
5408 }
5409
5410 StreamValue::consume_token(self.input_stream1.val());
5411 StreamValue::consume_token(self.input_stream2.val());
5412 };
5413
5414 Ok(self.operator.flush_progress())
5415 })
5416 }
5417
5418 fn start_transaction(&mut self) {
5419 self.operator.start_transaction();
5420 }
5421
5422 fn flush(&mut self) {
5423 self.operator.flush();
5424 }
5425
5426 fn is_flush_complete(&self) -> bool {
5427 self.operator.is_flush_complete()
5428 }
5429
5430 fn clock_start(&mut self, scope: Scope) {
5431 self.operator.clock_start(scope);
5432 }
5433
5434 fn clock_end(&mut self, scope: Scope) {
5435 self.operator.clock_end(scope);
5436 }
5437
5438 fn init(&mut self) {
5439 self.operator.init(&self.id);
5440 }
5441
5442 fn metadata(&self, output: &mut OperatorMeta) {
5443 self.operator.metadata(output);
5444 }
5445
5446 fn fixedpoint(&self, scope: Scope) -> bool {
5447 self.operator.fixedpoint(scope)
5448 }
5449
5450 fn checkpoint(
5451 &mut self,
5452 base: &StoragePath,
5453 files: &mut Vec<Arc<dyn FileCommitter>>,
5454 ) -> Result<(), DbspError> {
5455 self.operator
5456 .checkpoint(base, self.persistent_id().as_deref(), files)
5457 }
5458
5459 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5460 self.operator.restore(base, self.persistent_id().as_deref())
5461 }
5462
5463 fn start_compaction(&mut self) {
5464 self.operator.start_compaction()
5465 }
5466
5467 fn clear_state(&mut self) -> Result<(), DbspError> {
5468 self.operator.clear_state()
5469 }
5470
5471 fn start_replay(&mut self) -> Result<(), DbspError> {
5472 self.operator.start_replay()
5473 }
5474
5475 fn is_replay_complete(&self) -> bool {
5476 self.operator.is_replay_complete()
5477 }
5478
5479 fn end_replay(&mut self) -> Result<(), DbspError> {
5480 self.operator.end_replay()
5481 }
5482
5483 fn set_label(&mut self, key: &str, value: &str) {
5484 self.labels.insert(key.to_string(), value.to_string());
5485 }
5486
5487 fn get_label(&self, key: &str) -> Option<&str> {
5488 self.labels.get(key).map(|s| s.as_str())
5489 }
5490
5491 fn labels(&self) -> &BTreeMap<String, String> {
5492 &self.labels
5493 }
5494
5495 fn as_any(&self) -> &dyn Any {
5496 self
5497 }
5498}
5499
5500struct TernarySinkNode<C, I1, I2, I3, Op> {
5501 id: GlobalNodeId,
5502 operator: Op,
5503 input_stream1: Stream<C, I1>,
5504 input_stream2: Stream<C, I2>,
5505 input_stream3: Stream<C, I3>,
5506 labels: BTreeMap<String, String>,
5507}
5508
5509impl<C, I1, I2, I3, Op> TernarySinkNode<C, I1, I2, I3, Op>
5510where
5511 I1: Clone,
5512 I2: Clone,
5513 I3: Clone,
5514 Op: TernarySinkOperator<I1, I2, I3>,
5515 C: Circuit,
5516{
5517 fn new(
5518 operator: Op,
5519 input_stream1: Stream<C, I1>,
5520 input_stream2: Stream<C, I2>,
5521 input_stream3: Stream<C, I3>,
5522 circuit: C,
5523 id: NodeId,
5524 ) -> Self {
5525 assert!(!input_stream1.ptr_eq(&input_stream2));
5526 assert!(!input_stream1.ptr_eq(&input_stream3));
5527 assert!(!input_stream2.ptr_eq(&input_stream3));
5528
5529 Self {
5530 id: circuit.global_node_id().child(id),
5531 operator,
5532 input_stream1,
5533 input_stream2,
5534 input_stream3,
5535 labels: BTreeMap::new(),
5536 }
5537 }
5538}
5539
5540impl<C, I1, I2, I3, Op> Node for TernarySinkNode<C, I1, I2, I3, Op>
5541where
5542 C: Circuit,
5543 I1: Clone + 'static,
5544 I2: Clone + 'static,
5545 I3: Clone + 'static,
5546 Op: TernarySinkOperator<I1, I2, I3>,
5547{
5548 fn name(&self) -> Cow<'static, str> {
5549 self.operator.name()
5550 }
5551
5552 fn local_id(&self) -> NodeId {
5553 self.id.local_node_id().unwrap()
5554 }
5555
5556 fn global_id(&self) -> &GlobalNodeId {
5557 &self.id
5558 }
5559
5560 fn is_async(&self) -> bool {
5561 self.operator.is_async()
5562 }
5563
5564 fn is_input(&self) -> bool {
5565 self.operator.is_input()
5566 }
5567
5568 fn ready(&self) -> bool {
5569 self.operator.ready()
5570 }
5571
5572 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5573 self.operator.register_ready_callback(cb);
5574 }
5575
5576 #[allow(clippy::await_holding_refcell_ref)]
5578 fn eval<'a>(
5579 &'a mut self,
5580 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5581 Box::pin(async {
5582 let val1 = StreamValue::take(self.input_stream1.val()).map(|val| Cow::Owned(val));
5583 let r1 = self.input_stream1.get();
5584 let val2 = StreamValue::take(self.input_stream2.val()).map(|val| Cow::Owned(val));
5585 let r2 = self.input_stream2.get();
5586 let val3 = StreamValue::take(self.input_stream3.val()).map(|val| Cow::Owned(val));
5587 let r3 = self.input_stream3.get();
5588
5589 self.operator
5590 .eval(
5591 val1.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r1))),
5592 val2.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r2))),
5593 val3.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r3))),
5594 )
5595 .await;
5596
5597 drop(r1);
5598 drop(r2);
5599 drop(r3);
5600
5601 StreamValue::consume_token(self.input_stream1.val());
5602 StreamValue::consume_token(self.input_stream2.val());
5603 StreamValue::consume_token(self.input_stream3.val());
5604
5605 Ok(self.operator.flush_progress())
5606 })
5607 }
5608
5609 fn start_transaction(&mut self) {
5610 self.operator.start_transaction();
5611 }
5612
5613 fn flush(&mut self) {
5614 self.operator.flush();
5615 }
5616
5617 fn is_flush_complete(&self) -> bool {
5618 self.operator.is_flush_complete()
5619 }
5620
5621 fn clock_start(&mut self, scope: Scope) {
5622 self.operator.clock_start(scope);
5623 }
5624
5625 fn clock_end(&mut self, scope: Scope) {
5626 self.operator.clock_end(scope);
5627 }
5628
5629 fn init(&mut self) {
5630 self.operator.init(&self.id);
5631 }
5632
5633 fn metadata(&self, output: &mut OperatorMeta) {
5634 self.operator.metadata(output);
5635 }
5636
5637 fn fixedpoint(&self, scope: Scope) -> bool {
5638 self.operator.fixedpoint(scope)
5639 }
5640
5641 fn checkpoint(
5642 &mut self,
5643 base: &StoragePath,
5644 files: &mut Vec<Arc<dyn FileCommitter>>,
5645 ) -> Result<(), DbspError> {
5646 self.operator
5647 .checkpoint(base, self.persistent_id().as_deref(), files)
5648 }
5649
5650 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5651 self.operator.restore(base, self.persistent_id().as_deref())
5652 }
5653
5654 fn start_compaction(&mut self) {
5655 self.operator.start_compaction()
5656 }
5657
5658 fn clear_state(&mut self) -> Result<(), DbspError> {
5659 self.operator.clear_state()
5660 }
5661
5662 fn start_replay(&mut self) -> Result<(), DbspError> {
5663 self.operator.start_replay()
5664 }
5665
5666 fn is_replay_complete(&self) -> bool {
5667 self.operator.is_replay_complete()
5668 }
5669
5670 fn end_replay(&mut self) -> Result<(), DbspError> {
5671 self.operator.end_replay()
5672 }
5673
5674 fn set_label(&mut self, key: &str, value: &str) {
5675 self.labels.insert(key.to_string(), value.to_string());
5676 }
5677
5678 fn get_label(&self, key: &str) -> Option<&str> {
5679 self.labels.get(key).map(|s| s.as_str())
5680 }
5681
5682 fn labels(&self) -> &BTreeMap<String, String> {
5683 &self.labels
5684 }
5685
5686 fn as_any(&self) -> &dyn Any {
5687 self
5688 }
5689}
5690
5691struct BinaryNode<C, I1, I2, O, Op> {
5692 id: GlobalNodeId,
5693 operator: Op,
5694 input_stream1: Stream<C, I1>,
5695 input_stream2: Stream<C, I2>,
5696 output_stream: Stream<C, O>,
5697 is_alias: bool,
5699 labels: BTreeMap<String, String>,
5700}
5701
5702impl<C, I1, I2, O, Op> BinaryNode<C, I1, I2, O, Op>
5703where
5704 Op: BinaryOperator<I1, I2, O>,
5705 C: Circuit,
5706{
5707 fn new(
5708 operator: Op,
5709 input_stream1: Stream<C, I1>,
5710 input_stream2: Stream<C, I2>,
5711 circuit: C,
5712 id: NodeId,
5713 ) -> Self {
5714 let is_alias = input_stream1.ptr_eq(&input_stream2);
5715 Self {
5716 id: circuit.global_node_id().child(id),
5717 operator,
5718 input_stream1,
5719 input_stream2,
5720 is_alias,
5721 output_stream: Stream::new(circuit, id),
5722 labels: BTreeMap::new(),
5723 }
5724 }
5725
5726 fn output_stream(&self) -> Stream<C, O> {
5727 self.output_stream.clone()
5728 }
5729}
5730
5731impl<C, I1, I2, O, Op> Node for BinaryNode<C, I1, I2, O, Op>
5732where
5733 C: Circuit,
5734 I1: Clone + 'static,
5735 I2: Clone + 'static,
5736 O: Clone + 'static,
5737 Op: BinaryOperator<I1, I2, O>,
5738{
5739 fn name(&self) -> Cow<'static, str> {
5740 self.operator.name()
5741 }
5742
5743 fn local_id(&self) -> NodeId {
5744 self.id.local_node_id().unwrap()
5745 }
5746
5747 fn global_id(&self) -> &GlobalNodeId {
5748 &self.id
5749 }
5750
5751 fn is_async(&self) -> bool {
5752 self.operator.is_async()
5753 }
5754
5755 fn is_input(&self) -> bool {
5756 self.operator.is_input()
5757 }
5758
5759 fn ready(&self) -> bool {
5760 self.operator.ready()
5761 }
5762
5763 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5764 self.operator.register_ready_callback(cb);
5765 }
5766
5767 #[allow(clippy::await_holding_refcell_ref)]
5769 fn eval<'a>(
5770 &'a mut self,
5771 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5772 Box::pin(async {
5773 if self.is_alias {
5778 {
5779 let val1 = self.input_stream1.get();
5780 let val2 = self.input_stream2.get();
5781
5782 self.output_stream.put(
5783 self.operator
5784 .eval(StreamValue::peek(&val1), StreamValue::peek(&val2))
5785 .await,
5786 );
5787 }
5788 StreamValue::consume_token(self.input_stream1.val());
5791 StreamValue::consume_token(self.input_stream2.val());
5792 } else {
5793 let val1 = StreamValue::take(self.input_stream1.val());
5794 let val2 = StreamValue::take(self.input_stream2.val());
5795
5796 self.output_stream.put(match (val1, val2) {
5797 (Some(val1), Some(val2)) => self.operator.eval_owned(val1, val2).await,
5798 (Some(val1), None) => {
5799 self.operator
5800 .eval_owned_and_ref(val1, StreamValue::peek(&self.input_stream2.get()))
5801 .await
5802 }
5803 (None, Some(val2)) => {
5804 self.operator
5805 .eval_ref_and_owned(StreamValue::peek(&self.input_stream1.get()), val2)
5806 .await
5807 }
5808 (None, None) => {
5809 self.operator
5810 .eval(
5811 StreamValue::peek(&self.input_stream1.get()),
5812 StreamValue::peek(&self.input_stream2.get()),
5813 )
5814 .await
5815 }
5816 });
5817 StreamValue::consume_token(self.input_stream1.val());
5818 StreamValue::consume_token(self.input_stream2.val());
5819 }
5820 Ok(self.operator.flush_progress())
5821 })
5822 }
5823
5824 fn start_transaction(&mut self) {
5825 self.operator.start_transaction();
5826 }
5827
5828 fn flush(&mut self) {
5829 self.operator.flush();
5830 }
5831
5832 fn is_flush_complete(&self) -> bool {
5833 self.operator.is_flush_complete()
5834 }
5835
5836 fn clock_start(&mut self, scope: Scope) {
5837 self.operator.clock_start(scope);
5838 }
5839
5840 fn clock_end(&mut self, scope: Scope) {
5841 self.operator.clock_end(scope);
5842 }
5843
5844 fn init(&mut self) {
5845 self.operator.init(&self.id);
5846 }
5847
5848 fn metadata(&self, output: &mut OperatorMeta) {
5849 self.operator.metadata(output);
5850 }
5851
5852 fn fixedpoint(&self, scope: Scope) -> bool {
5853 self.operator.fixedpoint(scope)
5854 }
5855
5856 fn checkpoint(
5857 &mut self,
5858 base: &StoragePath,
5859 files: &mut Vec<Arc<dyn FileCommitter>>,
5860 ) -> Result<(), DbspError> {
5861 self.operator
5862 .checkpoint(base, self.persistent_id().as_deref(), files)
5863 }
5864
5865 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5866 self.operator.restore(base, self.persistent_id().as_deref())
5867 }
5868
5869 fn start_compaction(&mut self) {
5870 self.operator.start_compaction()
5871 }
5872
5873 fn clear_state(&mut self) -> Result<(), DbspError> {
5874 self.operator.clear_state()
5875 }
5876
5877 fn start_replay(&mut self) -> Result<(), DbspError> {
5878 self.operator.start_replay()
5879 }
5880
5881 fn is_replay_complete(&self) -> bool {
5882 self.operator.is_replay_complete()
5883 }
5884
5885 fn end_replay(&mut self) -> Result<(), DbspError> {
5886 self.operator.end_replay()
5887 }
5888
5889 fn set_label(&mut self, key: &str, value: &str) {
5890 self.labels.insert(key.to_string(), value.to_string());
5891 }
5892
5893 fn get_label(&self, key: &str) -> Option<&str> {
5894 self.labels.get(key).map(|s| s.as_str())
5895 }
5896
5897 fn labels(&self) -> &BTreeMap<String, String> {
5898 &self.labels
5899 }
5900
5901 fn as_any(&self) -> &dyn Any {
5902 self
5903 }
5904}
5905
5906struct TernaryNode<C, I1, I2, I3, O, Op> {
5907 id: GlobalNodeId,
5908 operator: Op,
5909 input_stream1: Stream<C, I1>,
5910 input_stream2: Stream<C, I2>,
5911 input_stream3: Stream<C, I3>,
5912 output_stream: Stream<C, O>,
5913 labels: BTreeMap<String, String>,
5914}
5915
5916impl<C, I1, I2, I3, O, Op> TernaryNode<C, I1, I2, I3, O, Op>
5917where
5918 I1: Clone,
5919 I2: Clone,
5920 I3: Clone,
5921 Op: TernaryOperator<I1, I2, I3, O>,
5922 C: Circuit,
5923{
5924 fn new(
5925 operator: Op,
5926 input_stream1: Stream<C, I1>,
5927 input_stream2: Stream<C, I2>,
5928 input_stream3: Stream<C, I3>,
5929 circuit: C,
5930 id: NodeId,
5931 ) -> Self {
5932 Self {
5933 id: circuit.global_node_id().child(id),
5934 operator,
5935 input_stream1,
5936 input_stream2,
5937 input_stream3,
5938 output_stream: Stream::new(circuit, id),
5941 labels: BTreeMap::new(),
5942 }
5943 }
5944
5945 fn output_stream(&self) -> Stream<C, O> {
5946 self.output_stream.clone()
5947 }
5948}
5949
5950impl<C, I1, I2, I3, O, Op> Node for TernaryNode<C, I1, I2, I3, O, Op>
5951where
5952 C: Circuit,
5953 I1: Clone + 'static,
5954 I2: Clone + 'static,
5955 I3: Clone + 'static,
5956 O: Clone + 'static,
5957 Op: TernaryOperator<I1, I2, I3, O>,
5958{
5959 fn name(&self) -> Cow<'static, str> {
5960 self.operator.name()
5961 }
5962
5963 fn local_id(&self) -> NodeId {
5964 self.id.local_node_id().unwrap()
5965 }
5966
5967 fn global_id(&self) -> &GlobalNodeId {
5968 &self.id
5969 }
5970
5971 fn is_async(&self) -> bool {
5972 self.operator.is_async()
5973 }
5974
5975 fn is_input(&self) -> bool {
5976 self.operator.is_input()
5977 }
5978
5979 fn ready(&self) -> bool {
5980 self.operator.ready()
5981 }
5982
5983 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5984 self.operator.register_ready_callback(cb);
5985 }
5986
5987 #[allow(clippy::await_holding_refcell_ref)]
5989 fn eval<'a>(
5990 &'a mut self,
5991 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5992 Box::pin(async {
5993 {
5994 self.output_stream.put(
5995 self.operator
5996 .eval(
5997 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5998 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5999 Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
6000 )
6001 .await,
6002 );
6003 }
6004
6005 StreamValue::consume_token(self.input_stream1.val());
6006 StreamValue::consume_token(self.input_stream2.val());
6007 StreamValue::consume_token(self.input_stream3.val());
6008
6009 Ok(self.operator.flush_progress())
6010 })
6011 }
6012
6013 fn start_transaction(&mut self) {
6014 self.operator.start_transaction();
6015 }
6016
6017 fn flush(&mut self) {
6018 self.operator.flush();
6019 }
6020
6021 fn is_flush_complete(&self) -> bool {
6022 self.operator.is_flush_complete()
6023 }
6024
6025 fn clock_start(&mut self, scope: Scope) {
6026 self.operator.clock_start(scope);
6027 }
6028
6029 fn clock_end(&mut self, scope: Scope) {
6030 self.operator.clock_end(scope);
6031 }
6032
6033 fn init(&mut self) {
6034 self.operator.init(&self.id);
6035 }
6036
6037 fn metadata(&self, output: &mut OperatorMeta) {
6038 self.operator.metadata(output);
6039 }
6040
6041 fn fixedpoint(&self, scope: Scope) -> bool {
6042 self.operator.fixedpoint(scope)
6043 }
6044
6045 fn checkpoint(
6046 &mut self,
6047 base: &StoragePath,
6048 files: &mut Vec<Arc<dyn FileCommitter>>,
6049 ) -> Result<(), DbspError> {
6050 self.operator
6051 .checkpoint(base, self.persistent_id().as_deref(), files)
6052 }
6053
6054 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6055 self.operator.restore(base, self.persistent_id().as_deref())
6056 }
6057
6058 fn start_compaction(&mut self) {
6059 self.operator.start_compaction()
6060 }
6061
6062 fn clear_state(&mut self) -> Result<(), DbspError> {
6063 self.operator.clear_state()
6064 }
6065
6066 fn start_replay(&mut self) -> Result<(), DbspError> {
6067 self.operator.start_replay()
6068 }
6069
6070 fn is_replay_complete(&self) -> bool {
6071 self.operator.is_replay_complete()
6072 }
6073
6074 fn end_replay(&mut self) -> Result<(), DbspError> {
6075 self.operator.end_replay()
6076 }
6077
6078 fn set_label(&mut self, key: &str, value: &str) {
6079 self.labels.insert(key.to_string(), value.to_string());
6080 }
6081
6082 fn get_label(&self, key: &str) -> Option<&str> {
6083 self.labels.get(key).map(|s| s.as_str())
6084 }
6085
6086 fn labels(&self) -> &BTreeMap<String, String> {
6087 &self.labels
6088 }
6089
6090 fn as_any(&self) -> &dyn Any {
6091 self
6092 }
6093}
6094
6095struct QuaternaryNode<C, I1, I2, I3, I4, O, Op> {
6096 id: GlobalNodeId,
6097 operator: Op,
6098 input_stream1: Stream<C, I1>,
6099 input_stream2: Stream<C, I2>,
6100 input_stream3: Stream<C, I3>,
6101 input_stream4: Stream<C, I4>,
6102 output_stream: Stream<C, O>,
6103 labels: BTreeMap<String, String>,
6104 }
6112
6113impl<C, I1, I2, I3, I4, O, Op> QuaternaryNode<C, I1, I2, I3, I4, O, Op>
6114where
6115 I1: Clone,
6116 I2: Clone,
6117 I3: Clone,
6118 I4: Clone,
6119 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
6120 C: Circuit,
6121{
6122 fn new(
6123 operator: Op,
6124 input_stream1: Stream<C, I1>,
6125 input_stream2: Stream<C, I2>,
6126 input_stream3: Stream<C, I3>,
6127 input_stream4: Stream<C, I4>,
6128 circuit: C,
6129 id: NodeId,
6130 ) -> Self {
6131 Self {
6138 id: circuit.global_node_id().child(id),
6139 operator,
6140 input_stream1,
6141 input_stream2,
6142 input_stream3,
6143 input_stream4,
6144 output_stream: Stream::new(circuit, id),
6148 labels: BTreeMap::new(),
6149 }
6150 }
6151
6152 fn output_stream(&self) -> Stream<C, O> {
6153 self.output_stream.clone()
6154 }
6155}
6156
6157impl<C, I1, I2, I3, I4, O, Op> Node for QuaternaryNode<C, I1, I2, I3, I4, O, Op>
6158where
6159 C: Circuit,
6160 I1: Clone + 'static,
6161 I2: Clone + 'static,
6162 I3: Clone + 'static,
6163 I4: Clone + 'static,
6164 O: Clone + 'static,
6165 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
6166{
6167 fn name(&self) -> Cow<'static, str> {
6168 self.operator.name()
6169 }
6170
6171 fn local_id(&self) -> NodeId {
6172 self.id.local_node_id().unwrap()
6173 }
6174
6175 fn global_id(&self) -> &GlobalNodeId {
6176 &self.id
6177 }
6178
6179 fn is_async(&self) -> bool {
6180 self.operator.is_async()
6181 }
6182
6183 fn is_input(&self) -> bool {
6184 self.operator.is_input()
6185 }
6186
6187 fn ready(&self) -> bool {
6188 self.operator.ready()
6189 }
6190
6191 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6192 self.operator.register_ready_callback(cb);
6193 }
6194
6195 #[allow(clippy::await_holding_refcell_ref)]
6197 fn eval<'a>(
6198 &'a mut self,
6199 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6200 Box::pin(async {
6201 {
6202 self.output_stream.put(
6203 self.operator
6204 .eval(
6205 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
6206 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
6207 Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
6208 Cow::Borrowed(StreamValue::peek(&self.input_stream4.get())),
6209 )
6210 .await,
6211 );
6212 }
6213
6214 StreamValue::consume_token(self.input_stream1.val());
6215 StreamValue::consume_token(self.input_stream2.val());
6216 StreamValue::consume_token(self.input_stream3.val());
6217 StreamValue::consume_token(self.input_stream4.val());
6218
6219 Ok(self.operator.flush_progress())
6220 })
6221 }
6222
6223 fn start_transaction(&mut self) {
6224 self.operator.start_transaction();
6225 }
6226
6227 fn flush(&mut self) {
6228 self.operator.flush();
6229 }
6230
6231 fn is_flush_complete(&self) -> bool {
6232 self.operator.is_flush_complete()
6233 }
6234
6235 fn clock_start(&mut self, scope: Scope) {
6236 self.operator.clock_start(scope);
6237 }
6238
6239 fn clock_end(&mut self, scope: Scope) {
6240 self.operator.clock_end(scope);
6241 }
6242
6243 fn init(&mut self) {
6244 self.operator.init(&self.id);
6245 }
6246
6247 fn metadata(&self, output: &mut OperatorMeta) {
6248 self.operator.metadata(output);
6249 }
6250
6251 fn fixedpoint(&self, scope: Scope) -> bool {
6252 self.operator.fixedpoint(scope)
6253 }
6254
6255 fn checkpoint(
6256 &mut self,
6257 base: &StoragePath,
6258 files: &mut Vec<Arc<dyn FileCommitter>>,
6259 ) -> Result<(), DbspError> {
6260 self.operator
6261 .checkpoint(base, self.persistent_id().as_deref(), files)
6262 }
6263
6264 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6265 self.operator.restore(base, self.persistent_id().as_deref())
6266 }
6267
6268 fn start_compaction(&mut self) {
6269 self.operator.start_compaction()
6270 }
6271
6272 fn clear_state(&mut self) -> Result<(), DbspError> {
6273 self.operator.clear_state()
6274 }
6275
6276 fn start_replay(&mut self) -> Result<(), DbspError> {
6277 self.operator.start_replay()
6278 }
6279
6280 fn is_replay_complete(&self) -> bool {
6281 self.operator.is_replay_complete()
6282 }
6283
6284 fn end_replay(&mut self) -> Result<(), DbspError> {
6285 self.operator.end_replay()
6286 }
6287
6288 fn set_label(&mut self, key: &str, value: &str) {
6289 self.labels.insert(key.to_string(), value.to_string());
6290 }
6291
6292 fn get_label(&self, key: &str) -> Option<&str> {
6293 self.labels.get(key).map(|s| s.as_str())
6294 }
6295
6296 fn labels(&self) -> &BTreeMap<String, String> {
6297 &self.labels
6298 }
6299
6300 fn as_any(&self) -> &dyn Any {
6301 self
6302 }
6303}
6304
6305struct NaryNode<C, I, O, Op>
6306where
6307 I: Clone + 'static,
6308{
6309 id: GlobalNodeId,
6310 operator: Op,
6311 input_streams: Vec<Stream<C, I>>,
6314 output_stream: Stream<C, O>,
6317 labels: BTreeMap<String, String>,
6318}
6319
6320impl<C, I, O, Op> NaryNode<C, I, O, Op>
6321where
6322 I: Clone + 'static,
6323 Op: NaryOperator<I, O>,
6324 C: Circuit,
6325{
6326 fn new<Iter>(operator: Op, input_streams: Iter, circuit: C, id: NodeId) -> Self
6327 where
6328 Iter: IntoIterator<Item = Stream<C, I>>,
6329 {
6330 let mut input_streams: Vec<_> = input_streams.into_iter().collect();
6331 input_streams.shrink_to_fit();
6343 Self {
6344 id: circuit.global_node_id().child(id),
6345 operator,
6346 input_streams,
6347 output_stream: Stream::new(circuit, id),
6349 labels: BTreeMap::new(),
6350 }
6351 }
6352
6353 fn output_stream(&self) -> Stream<C, O> {
6354 self.output_stream.clone()
6355 }
6356}
6357
6358impl<C, I, O, Op> Node for NaryNode<C, I, O, Op>
6359where
6360 C: Circuit,
6361 I: Clone,
6362 O: Clone + 'static,
6363 Op: NaryOperator<I, O>,
6364{
6365 fn name(&self) -> Cow<'static, str> {
6366 self.operator.name()
6367 }
6368
6369 fn local_id(&self) -> NodeId {
6370 self.id.local_node_id().unwrap()
6371 }
6372
6373 fn global_id(&self) -> &GlobalNodeId {
6374 &self.id
6375 }
6376
6377 fn is_async(&self) -> bool {
6378 self.operator.is_async()
6379 }
6380
6381 fn is_input(&self) -> bool {
6382 self.operator.is_input()
6383 }
6384
6385 fn ready(&self) -> bool {
6386 self.operator.ready()
6387 }
6388
6389 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6390 self.operator.register_ready_callback(cb);
6391 }
6392
6393 fn eval<'a>(
6394 &'a mut self,
6395 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6396 Box::pin(async {
6397 let refs = self
6398 .input_streams
6399 .iter()
6400 .map(|stream| stream.get())
6401 .collect::<Vec<_>>();
6402
6403 self.output_stream.put(
6404 self.operator
6405 .eval(refs.iter().map(|r| Cow::Borrowed(StreamValue::peek(r))))
6406 .await,
6407 );
6408
6409 std::mem::drop(refs);
6410
6411 for i in self.input_streams.iter() {
6412 StreamValue::consume_token(i.val());
6413 }
6414 Ok(self.operator.flush_progress())
6415 })
6416 }
6417
6418 fn start_transaction(&mut self) {
6419 self.operator.start_transaction();
6420 }
6421
6422 fn flush(&mut self) {
6423 self.operator.flush();
6424 }
6425
6426 fn is_flush_complete(&self) -> bool {
6427 self.operator.is_flush_complete()
6428 }
6429
6430 fn clock_start(&mut self, scope: Scope) {
6431 self.operator.clock_start(scope);
6432 }
6433
6434 fn clock_end(&mut self, scope: Scope) {
6435 self.operator.clock_end(scope);
6436 }
6437
6438 fn init(&mut self) {
6439 self.operator.init(&self.id);
6440 }
6441
6442 fn metadata(&self, output: &mut OperatorMeta) {
6443 self.operator.metadata(output);
6444 }
6445
6446 fn fixedpoint(&self, scope: Scope) -> bool {
6447 self.operator.fixedpoint(scope)
6448 }
6449
6450 fn checkpoint(
6451 &mut self,
6452 base: &StoragePath,
6453 files: &mut Vec<Arc<dyn FileCommitter>>,
6454 ) -> Result<(), DbspError> {
6455 self.operator
6456 .checkpoint(base, self.persistent_id().as_deref(), files)
6457 }
6458
6459 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6460 self.operator.restore(base, self.persistent_id().as_deref())
6461 }
6462
6463 fn start_compaction(&mut self) {
6464 self.operator.start_compaction()
6465 }
6466
6467 fn clear_state(&mut self) -> Result<(), DbspError> {
6468 self.operator.clear_state()
6469 }
6470
6471 fn start_replay(&mut self) -> Result<(), DbspError> {
6472 self.operator.start_replay()
6473 }
6474
6475 fn is_replay_complete(&self) -> bool {
6476 self.operator.is_replay_complete()
6477 }
6478
6479 fn end_replay(&mut self) -> Result<(), DbspError> {
6480 self.operator.end_replay()
6481 }
6482
6483 fn set_label(&mut self, key: &str, value: &str) {
6484 self.labels.insert(key.to_string(), value.to_string());
6485 }
6486
6487 fn get_label(&self, key: &str) -> Option<&str> {
6488 self.labels.get(key).map(|s| s.as_str())
6489 }
6490
6491 fn labels(&self) -> &BTreeMap<String, String> {
6492 &self.labels
6493 }
6494
6495 fn as_any(&self) -> &dyn Any {
6496 self
6497 }
6498}
6499
6500struct FeedbackOutputNode<C, I, O, Op>
6506where
6507 C: Circuit,
6508{
6509 id: GlobalNodeId,
6510 operator: Rc<RefCell<Op>>,
6511 output_stream: Stream<C, O>,
6512 export_stream: Option<Stream<C::Parent, O>>,
6513 phantom_input: PhantomData<I>,
6514 labels: BTreeMap<String, String>,
6515}
6516
6517impl<C, I, O, Op> FeedbackOutputNode<C, I, O, Op>
6518where
6519 C: Circuit,
6520 Op: StrictUnaryOperator<I, O>,
6521{
6522 fn new(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6523 Self {
6524 id: circuit.global_node_id().child(id),
6525 operator,
6526 output_stream: Stream::new(circuit.clone(), id),
6527 export_stream: None,
6528 phantom_input: PhantomData,
6529 labels: BTreeMap::new(),
6530 }
6531 }
6532
6533 fn with_export(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6534 let mut result = Self::new(operator, circuit.clone(), id);
6535 result.export_stream = Some(Stream::with_origin(
6536 circuit.parent(),
6537 circuit.allocate_stream_id(),
6538 circuit.node_id(),
6539 GlobalNodeId::child_of(&circuit, id),
6540 ));
6541 result
6542 }
6543
6544 fn output_stream(&self) -> Stream<C, O> {
6545 self.output_stream.clone()
6546 }
6547}
6548
6549impl<C, I, O, Op> Node for FeedbackOutputNode<C, I, O, Op>
6550where
6551 C: Circuit,
6552 I: Data,
6553 O: Clone + 'static,
6554 Op: StrictUnaryOperator<I, O>,
6555{
6556 fn local_id(&self) -> NodeId {
6557 self.id.local_node_id().unwrap()
6558 }
6559
6560 fn global_id(&self) -> &GlobalNodeId {
6561 &self.id
6562 }
6563
6564 fn name(&self) -> Cow<'static, str> {
6565 self.operator.borrow().name()
6566 }
6567
6568 fn is_async(&self) -> bool {
6569 self.operator.borrow().is_async()
6570 }
6571
6572 fn is_input(&self) -> bool {
6573 self.operator.borrow().is_input()
6574 }
6575
6576 fn ready(&self) -> bool {
6577 self.operator.borrow().ready()
6578 }
6579
6580 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6581 self.operator.borrow_mut().register_ready_callback(cb);
6582 }
6583
6584 fn eval<'a>(
6585 &'a mut self,
6586 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6587 Box::pin(async {
6588 self.output_stream
6589 .put(self.operator.borrow_mut().get_output());
6590 Ok(None)
6591 })
6592 }
6593
6594 fn start_transaction(&mut self) {
6595 self.operator.borrow_mut().start_transaction();
6596 }
6597
6598 fn flush(&mut self) {
6599 self.operator.borrow_mut().flush();
6600 }
6601
6602 fn is_flush_complete(&self) -> bool {
6603 self.operator.borrow().is_flush_complete()
6604 }
6605
6606 fn clock_start(&mut self, scope: Scope) {
6607 self.operator.borrow_mut().clock_start(scope)
6608 }
6609
6610 fn clock_end(&mut self, scope: Scope) {
6611 if scope == 0
6612 && let Some(export_stream) = &mut self.export_stream
6613 {
6614 export_stream.put(self.operator.borrow_mut().get_final_output());
6615 }
6616 self.operator.borrow_mut().clock_end(scope);
6617 }
6618
6619 fn init(&mut self) {
6620 self.operator.borrow_mut().init(&self.id);
6621 }
6622
6623 fn metadata(&self, _output: &mut OperatorMeta) {
6624 }
6627
6628 fn fixedpoint(&self, scope: Scope) -> bool {
6629 self.operator.borrow().fixedpoint(scope)
6630 }
6631
6632 fn checkpoint(
6633 &mut self,
6634 base: &StoragePath,
6635 files: &mut Vec<Arc<dyn FileCommitter>>,
6636 ) -> Result<(), DbspError> {
6637 self.operator
6638 .borrow_mut()
6639 .checkpoint(base, self.persistent_id().as_deref(), files)
6640 }
6641
6642 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6643 self.operator
6644 .borrow_mut()
6645 .restore(base, self.persistent_id().as_deref())
6646 }
6647
6648 fn start_compaction(&mut self) {
6649 self.operator.borrow_mut().start_compaction()
6650 }
6651
6652 fn clear_state(&mut self) -> Result<(), DbspError> {
6653 self.operator.borrow_mut().clear_state()
6654 }
6655
6656 fn start_replay(&mut self) -> Result<(), DbspError> {
6657 self.operator.borrow_mut().start_replay()
6658 }
6659
6660 fn is_replay_complete(&self) -> bool {
6661 self.operator.borrow().is_replay_complete()
6662 }
6663
6664 fn end_replay(&mut self) -> Result<(), DbspError> {
6665 self.operator.borrow_mut().end_replay()
6666 }
6667
6668 fn set_label(&mut self, key: &str, value: &str) {
6669 self.labels.insert(key.to_string(), value.to_string());
6670 }
6671
6672 fn get_label(&self, key: &str) -> Option<&str> {
6673 self.labels.get(key).map(|s| s.as_str())
6674 }
6675
6676 fn labels(&self) -> &BTreeMap<String, String> {
6677 &self.labels
6678 }
6679
6680 fn as_any(&self) -> &dyn Any {
6681 self
6682 }
6683}
6684
6685struct FeedbackInputNode<C, I, O, Op> {
6687 id: GlobalNodeId,
6689 operator: Rc<RefCell<Op>>,
6690 input_stream: Stream<C, I>,
6691 phantom_output: PhantomData<O>,
6692 labels: BTreeMap<String, String>,
6693}
6694
6695impl<C, I, O, Op> FeedbackInputNode<C, I, O, Op>
6696where
6697 Op: StrictUnaryOperator<I, O>,
6698 C: Circuit,
6699{
6700 fn new(operator: Rc<RefCell<Op>>, input_stream: Stream<C, I>, id: NodeId) -> Self {
6701 Self {
6702 id: input_stream.circuit().global_node_id().child(id),
6703 operator,
6704 input_stream,
6705 phantom_output: PhantomData,
6706 labels: BTreeMap::new(),
6707 }
6708 }
6709}
6710
6711impl<C, I, O, Op> Node for FeedbackInputNode<C, I, O, Op>
6712where
6713 Op: StrictUnaryOperator<I, O>,
6714 I: Data,
6715 O: 'static,
6716 C: Clone + 'static,
6717{
6718 fn name(&self) -> Cow<'static, str> {
6719 self.operator.borrow().name()
6720 }
6721
6722 fn local_id(&self) -> NodeId {
6723 self.id.local_node_id().unwrap()
6724 }
6725
6726 fn global_id(&self) -> &GlobalNodeId {
6727 &self.id
6728 }
6729
6730 fn is_async(&self) -> bool {
6731 self.operator.borrow().is_async()
6732 }
6733
6734 fn is_input(&self) -> bool {
6735 self.operator.borrow().is_input()
6736 }
6737
6738 fn ready(&self) -> bool {
6739 self.operator.borrow().ready()
6740 }
6741
6742 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6743 self.operator.borrow_mut().register_ready_callback(cb);
6744 }
6745
6746 #[allow(clippy::await_holding_refcell_ref)]
6748 fn eval<'a>(
6749 &'a mut self,
6750 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6751 Box::pin(async {
6752 match StreamValue::take(self.input_stream.val()) {
6753 Some(v) => self.operator.borrow_mut().eval_strict_owned(v).await,
6754 None => {
6755 self.operator
6756 .borrow_mut()
6757 .eval_strict(StreamValue::peek(&self.input_stream.get()))
6758 .await
6759 }
6760 };
6761
6762 StreamValue::consume_token(self.input_stream.val());
6763
6764 Ok(None)
6765 })
6766 }
6767
6768 fn start_transaction(&mut self) {
6769 self.operator.borrow_mut().start_transaction();
6770 }
6771
6772 fn flush(&mut self) {
6773 self.operator.borrow_mut().flush_input();
6774 }
6775
6776 fn is_flush_complete(&self) -> bool {
6777 self.operator.borrow().is_flush_input_complete()
6778 }
6779
6780 fn clock_start(&mut self, _scope: Scope) {}
6783
6784 fn clock_end(&mut self, _scope: Scope) {}
6785
6786 fn init(&mut self) {
6787 self.operator.borrow_mut().init(&self.id);
6788 }
6789
6790 fn metadata(&self, output: &mut OperatorMeta) {
6791 self.operator.borrow().metadata(output)
6792 }
6793
6794 fn fixedpoint(&self, scope: Scope) -> bool {
6795 self.operator.borrow().fixedpoint(scope)
6796 }
6797
6798 fn checkpoint(
6799 &mut self,
6800 _base: &StoragePath,
6801 _files: &mut Vec<Arc<dyn FileCommitter>>,
6802 ) -> Result<(), DbspError> {
6803 Ok(())
6810 }
6811
6812 fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6813 Ok(())
6815 }
6816
6817 fn start_compaction(&mut self) {}
6818
6819 fn clear_state(&mut self) -> Result<(), DbspError> {
6820 Ok(())
6821 }
6822
6823 fn start_replay(&mut self) -> Result<(), DbspError> {
6824 self.operator.borrow_mut().start_replay()
6825 }
6826
6827 fn is_replay_complete(&self) -> bool {
6828 self.operator.borrow().is_replay_complete()
6829 }
6830
6831 fn end_replay(&mut self) -> Result<(), DbspError> {
6832 self.operator.borrow_mut().end_replay()
6833 }
6834
6835 fn set_label(&mut self, key: &str, value: &str) {
6836 self.labels.insert(key.to_string(), value.to_string());
6837 }
6838
6839 fn get_label(&self, key: &str) -> Option<&str> {
6840 self.labels.get(key).map(|s| s.as_str())
6841 }
6842
6843 fn labels(&self) -> &BTreeMap<String, String> {
6844 &self.labels
6845 }
6846
6847 fn as_any(&self) -> &dyn Any {
6848 self
6849 }
6850}
6851
6852pub struct FeedbackConnector<C, I, O, Op> {
6860 output_node_id: NodeId,
6861 circuit: C,
6862 operator: Rc<RefCell<Op>>,
6863 phantom_input: PhantomData<I>,
6864 phantom_output: PhantomData<O>,
6865}
6866
6867impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6868where
6869 Op: StrictUnaryOperator<I, O>,
6870{
6871 fn new(output_node_id: NodeId, circuit: C, operator: Rc<RefCell<Op>>) -> Self {
6872 Self {
6873 output_node_id,
6874 circuit,
6875 operator,
6876 phantom_input: PhantomData,
6877 phantom_output: PhantomData,
6878 }
6879 }
6880}
6881
6882impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6883where
6884 Op: StrictUnaryOperator<I, O>,
6885 I: Data,
6886 O: Data,
6887 C: Circuit,
6888{
6889 pub fn operator_mut(&self) -> RefMut<'_, Op> {
6890 self.operator.borrow_mut()
6891 }
6892
6893 pub fn connect(self, input_stream: &Stream<C, I>) {
6898 self.connect_with_preference(input_stream, OwnershipPreference::INDIFFERENT)
6899 }
6900
6901 pub fn connect_with_preference(
6902 self,
6903 input_stream: &Stream<C, I>,
6904 input_preference: OwnershipPreference,
6905 ) {
6906 self.circuit.connect_feedback_with_preference(
6907 self.output_node_id,
6908 self.operator,
6909 input_stream,
6910 input_preference,
6911 )
6912 }
6913}
6914
6915struct ChildNode<C>
6917where
6918 C: Circuit,
6919{
6920 id: GlobalNodeId,
6921 circuit: C,
6922 executor: Box<dyn Executor<C>>,
6923 labels: BTreeMap<String, String>,
6924 nesting_depth: Scope,
6925}
6926
6927impl<C> Drop for ChildNode<C>
6928where
6929 C: Circuit,
6930{
6931 fn drop(&mut self) {
6932 self.circuit.clear();
6935 }
6936}
6937
6938impl<C> ChildNode<C>
6939where
6940 C: Circuit,
6941{
6942 fn new<E>(circuit: C, nesting_depth: Scope, executor: E) -> Self
6943 where
6944 E: Executor<C>,
6945 {
6946 Self {
6947 id: circuit.global_node_id(),
6948 circuit,
6949 executor: Box::new(executor) as Box<dyn Executor<C>>,
6950 labels: BTreeMap::new(),
6951 nesting_depth,
6952 }
6953 }
6954}
6955
6956impl<C> Node for ChildNode<C>
6957where
6958 C: Circuit,
6959{
6960 fn name(&self) -> Cow<'static, str> {
6961 Cow::Borrowed("Subcircuit")
6962 }
6963
6964 fn local_id(&self) -> NodeId {
6965 self.id.local_node_id().unwrap()
6966 }
6967
6968 fn global_id(&self) -> &GlobalNodeId {
6969 &self.id
6970 }
6971
6972 fn is_circuit(&self) -> bool {
6973 true
6974 }
6975
6976 fn is_async(&self) -> bool {
6977 false
6978 }
6979
6980 fn is_input(&self) -> bool {
6981 false
6982 }
6983
6984 fn ready(&self) -> bool {
6985 true
6986 }
6987
6988 fn eval<'a>(
6989 &'a mut self,
6990 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6991 Box::pin(async {
6992 for node_id in self.circuit.import_nodes() {
6995 self.circuit.eval_import_node(node_id).await;
6996 }
6997 self.executor.transaction(&self.circuit).await?;
6998 Ok(None)
6999 })
7000 }
7001
7002 fn start_transaction(&mut self) {
7003 }
7005
7006 fn flush(&mut self) {
7007 self.executor.flush();
7008 }
7009
7010 fn is_flush_complete(&self) -> bool {
7011 self.executor.is_flush_complete()
7012 }
7013
7014 fn clock_start(&mut self, scope: Scope) {
7015 self.circuit.clock_start(scope + self.nesting_depth);
7016 }
7017
7018 fn clock_end(&mut self, scope: Scope) {
7019 self.circuit.clock_end(scope + self.nesting_depth);
7020 }
7021
7022 fn metadata(&self, _meta: &mut OperatorMeta) {}
7023
7024 fn fixedpoint(&self, scope: Scope) -> bool {
7025 self.circuit.check_fixedpoint(scope + self.nesting_depth)
7026 }
7027
7028 fn map_nodes_recursive(
7029 &self,
7030 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
7031 ) -> Result<(), DbspError> {
7032 self.circuit.map_nodes_recursive(f)
7033 }
7034
7035 fn checkpoint(
7036 &mut self,
7037 _base: &StoragePath,
7038 _files: &mut Vec<Arc<dyn FileCommitter>>,
7039 ) -> Result<(), DbspError> {
7040 Ok(())
7041 }
7042
7043 fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
7044 Ok(())
7045 }
7046
7047 fn start_compaction(&mut self) {
7048 self.circuit.start_compaction();
7049 }
7050
7051 fn clear_state(&mut self) -> Result<(), DbspError> {
7052 self.circuit
7053 .map_local_nodes_mut(&mut |node| node.clear_state())
7054 }
7055
7056 fn start_replay(&mut self) -> Result<(), DbspError> {
7057 Ok(())
7058 }
7059
7060 fn is_replay_complete(&self) -> bool {
7061 true
7062 }
7063
7064 fn end_replay(&mut self) -> Result<(), DbspError> {
7065 Ok(())
7066 }
7067
7068 fn set_label(&mut self, key: &str, value: &str) {
7069 self.labels.insert(key.to_string(), value.to_string());
7070 }
7071
7072 fn get_label(&self, key: &str) -> Option<&str> {
7073 self.labels.get(key).map(|s| s.as_str())
7074 }
7075
7076 fn labels(&self) -> &BTreeMap<String, String> {
7077 &self.labels
7078 }
7079
7080 fn map_child(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
7081 self.circuit.map_node_relative(path, f);
7082 }
7083
7084 fn map_child_mut(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
7085 self.circuit.map_node_mut_relative(path, f);
7086 }
7087
7088 fn as_any(&self) -> &dyn Any {
7089 self
7090 }
7091
7092 fn as_circuit(&self) -> Option<&dyn CircuitBase> {
7093 Some(&self.circuit)
7094 }
7095}
7096
7097pub struct CircuitHandle {
7103 circuit: RootCircuit,
7104 executor: Box<dyn Executor<RootCircuit>>,
7105 tokio_runtime: TokioRuntime,
7106 replay_info: Option<BootstrapInfo>,
7107}
7108
7109impl Drop for CircuitHandle {
7110 fn drop(&mut self) {
7111 self.circuit
7112 .log_scheduler_event(&SchedulerEvent::clock_end());
7113
7114 if !panicking() {
7118 self.circuit.clock_end(0)
7119 }
7120
7121 self.circuit.clear();
7126 }
7127}
7128
7129#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
7131pub struct BootstrapInfo {
7132 pub replay_sources: BTreeMap<NodeId, StreamId>,
7134
7135 #[allow(dead_code)]
7137 pub need_backfill: BTreeMap<NodeId, Option<String>>,
7138}
7139
7140impl CircuitHandle {
7141 pub fn transaction(&self) -> Result<(), DbspError> {
7143 self.tokio_runtime
7144 .block_on(async {
7145 let local_set = LocalSet::new();
7146 local_set
7147 .run_until(async { self.executor.transaction(&self.circuit).await })
7148 .await
7149 })
7150 .map_err(DbspError::Scheduler)
7151 }
7152
7153 pub fn start_transaction(&self) -> Result<(), DbspError> {
7158 self.tokio_runtime
7159 .block_on(async {
7160 let local_set = LocalSet::new();
7161 local_set
7162 .run_until(async { self.executor.start_transaction(&self.circuit).await })
7163 .await
7164 })
7165 .map_err(DbspError::Scheduler)
7166 }
7167
7168 pub fn start_commit_transaction(&self) -> Result<(), DbspError> {
7173 self.executor
7174 .start_commit_transaction()
7175 .map_err(DbspError::Scheduler)
7176 }
7177
7178 pub fn is_commit_complete(&self) -> bool {
7179 self.executor.is_commit_complete()
7180 }
7181
7182 pub fn commit_progress(&self) -> CommitProgress {
7183 self.executor.commit_progress()
7184 }
7185
7186 pub fn step(&self) -> Result<(), DbspError> {
7188 self.tokio_runtime
7189 .block_on(async {
7190 let local_set = LocalSet::new();
7191 local_set
7192 .run_until(async { self.executor.step(&self.circuit).await })
7193 .await
7194 })
7195 .map_err(DbspError::Scheduler)
7196 }
7197
7198 pub fn checkpoint(
7199 &mut self,
7200 base: &StoragePath,
7201 files: &mut Vec<Arc<dyn FileCommitter>>,
7202 ) -> Result<(), DbspError> {
7203 self.circuit
7236 .map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
7237 let _span = Span::new("operator")
7238 .with_category("Checkpoint")
7239 .with_tooltip(|| format!("{} {}", node.name(), node.global_id()));
7240 DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS
7241 .record_callback(|| node.checkpoint(base, files))
7242 })
7243 }
7244
7245 pub fn restore(&mut self, base: &StoragePath) -> Result<Option<BootstrapInfo>, DbspError> {
7268 let mut replay_sources: BTreeMap<NodeId, StreamId> = BTreeMap::new();
7270
7271 let mut need_backfill: BTreeSet<GlobalNodeId> = BTreeSet::new();
7273
7274 if Runtime::mode() == Mode::Persistent {
7282 let backend = Runtime::storage_backend()?;
7283 crate::circuit::checkpointer::Checkpointer::verify_checkpoint_intact(
7284 backend.as_ref(),
7285 base,
7286 )?;
7287 }
7288
7289 self.circuit.map_nodes_recursive_mut(
7294 &mut |node: &mut dyn Node| match node.restore(base) {
7295 Err(e) if Runtime::mode() == Mode::Ephemeral => Err(e),
7296 Err(DbspError::Storage(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
7297 need_backfill.insert(node.global_id().clone());
7298 Ok(())
7299 }
7300 Err(DbspError::IO(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
7301 need_backfill.insert(node.global_id().clone());
7302 Ok(())
7303 }
7304 Err(e) => Err(e),
7305 Ok(()) => Ok(()),
7306 },
7307 )?;
7308
7309 let additional_need_backfill: BTreeSet<GlobalNodeId> =
7311 self.invalidate_balancer_clusters(&need_backfill);
7312 if Runtime::worker_index() == 0 {
7313 debug!(
7314 "CircuitHandle::restore: additional need backfill: {:?}",
7315 additional_need_backfill
7316 );
7317 }
7318 need_backfill.extend(additional_need_backfill);
7319
7320 debug!(
7321 "worker {}: CircuitHandle::restore: found {} operators that require backfill: {:?}",
7322 Runtime::worker_index(),
7323 need_backfill.len(),
7324 need_backfill.iter().cloned().collect::<Vec<GlobalNodeId>>()
7325 );
7326
7327 let need_backfill = need_backfill
7331 .into_iter()
7332 .map(|gid| gid.top_level_ancestor())
7333 .collect::<BTreeSet<_>>();
7334
7335 let mut participate_in_backfill = need_backfill.clone();
7341
7342 let mut participate_in_backfill_new = need_backfill.clone();
7344
7345 while !participate_in_backfill_new.is_empty() {
7346 participate_in_backfill_new = self.compute_replay_nodes_step(
7347 &mut replay_sources,
7348 &need_backfill,
7349 participate_in_backfill_new,
7350 &mut participate_in_backfill,
7351 )?;
7352 }
7353
7354 debug!(
7355 "worker {}: CircuitHandle::restore: replaying {} operators: {:?}\n backfilling {} operators: {:?}\n replay circuit consists of {} operators: {:?}",
7356 Runtime::worker_index(),
7357 replay_sources.len(),
7358 replay_sources.keys().cloned().collect::<Vec<NodeId>>(),
7359 need_backfill.len(),
7360 need_backfill.iter().cloned().collect::<Vec<NodeId>>(),
7361 participate_in_backfill.len(),
7362 participate_in_backfill
7363 .iter()
7364 .cloned()
7365 .collect::<Vec<NodeId>>()
7366 );
7367
7368 let replay_nodes = replay_sources.keys().cloned().collect::<BTreeSet<_>>();
7369
7370 assert!(
7371 replay_nodes
7372 .intersection(&need_backfill)
7373 .collect::<Vec<_>>()
7374 .is_empty()
7375 );
7376
7377 let nodes_to_backfill = participate_in_backfill
7380 .difference(&replay_nodes)
7381 .cloned()
7382 .collect::<BTreeSet<_>>();
7383
7384 if !participate_in_backfill.is_empty() {
7385 for node_id in replay_nodes.iter() {
7387 self.circuit
7388 .map_local_node_mut(*node_id, &mut |node| node.start_replay())?;
7389 }
7390
7391 for node_id in nodes_to_backfill.iter() {
7393 self.circuit
7394 .map_local_node_mut(*node_id, &mut |node| node.clear_state())?;
7395 }
7396
7397 self.executor
7399 .prepare(&self.circuit, Some(&participate_in_backfill))?;
7400
7401 let need_backfill = nodes_to_backfill
7446 .iter()
7447 .map(|node_id| {
7448 let pid = self.circuit.map_local_node_mut(*node_id, &mut |node| {
7449 node.get_label(LABEL_PERSISTENT_OPERATOR_ID)
7450 .map(|s| s.to_string())
7451 });
7452
7453 (*node_id, pid)
7454 })
7455 .collect::<BTreeMap<_, _>>();
7456
7457 let replay_info = BootstrapInfo {
7458 replay_sources: replay_sources.clone(),
7459 need_backfill,
7460 };
7461
7462 self.replay_info = Some(replay_info.clone());
7463
7464 Ok(Some(replay_info))
7465 } else {
7466 Ok(None)
7467 }
7468 }
7469
7470 fn invalidate_balancer_clusters(
7500 &self,
7501 need_backfill: &BTreeSet<GlobalNodeId>,
7502 ) -> BTreeSet<GlobalNodeId> {
7503 let need_backfill_node_ids: BTreeSet<NodeId> = need_backfill
7505 .iter()
7506 .map(|gid| gid.top_level_ancestor())
7507 .collect();
7508
7509 let additional_need_backfill = self
7511 .circuit
7512 .balancer()
7513 .invalidate_clusters_for_bootstrapping(&need_backfill_node_ids);
7514
7515 let nodes_to_add = self.propagate_need_backfill_forward(
7518 additional_need_backfill
7519 .difference(&need_backfill_node_ids)
7520 .cloned()
7521 .collect(),
7522 );
7523
7524 nodes_to_add
7526 .into_iter()
7527 .map(|node_id| GlobalNodeId::root().child(node_id))
7528 .collect()
7529 }
7530
7531 fn propagate_need_backfill_forward(
7533 &self,
7534 mut need_backfill: BTreeSet<NodeId>,
7535 ) -> BTreeSet<NodeId> {
7536 let mut worklist: Vec<NodeId> = need_backfill.iter().cloned().collect();
7538 let mut visited = BTreeSet::new();
7539
7540 while let Some(node_id) = worklist.pop() {
7541 if visited.contains(&node_id) {
7542 continue;
7543 }
7544 visited.insert(node_id);
7545
7546 let successors: Vec<NodeId> = self
7548 .circuit
7549 .edges()
7550 .by_source
7551 .get(&node_id)
7552 .into_iter()
7553 .flat_map(|edges| edges.iter().map(|edge| edge.to))
7554 .collect();
7555
7556 for successor in successors {
7557 if !visited.contains(&successor) {
7558 worklist.push(successor);
7559 need_backfill.insert(successor);
7560 }
7561 }
7562
7563 let dependencies: Vec<NodeId> = self
7565 .circuit
7566 .edges()
7567 .by_destination
7568 .get(&node_id)
7569 .into_iter()
7570 .flat_map(|edges| edges.iter())
7571 .filter(|edge| edge.is_dependency())
7572 .map(|edge| edge.from)
7573 .collect();
7574
7575 for dependency in dependencies {
7576 if !visited.contains(&dependency) {
7577 worklist.push(dependency);
7578 need_backfill.insert(dependency);
7579 }
7580 }
7581 }
7582
7583 need_backfill
7584 }
7585
7586 fn compute_replay_nodes_step(
7597 &self,
7598 replay_sources: &mut BTreeMap<NodeId, StreamId>,
7599 need_backfill: &BTreeSet<NodeId>,
7600 participate_in_backfill_new: BTreeSet<NodeId>,
7601 participate_in_backfill: &mut BTreeSet<NodeId>,
7602 ) -> Result<BTreeSet<NodeId>, DbspError> {
7603 let mut inputs = BTreeSet::new();
7604
7605 for node_id in participate_in_backfill_new.iter() {
7606 let node_inputs = self
7615 .circuit
7616 .edges()
7617 .by_destination
7618 .get(node_id)
7619 .iter()
7620 .flat_map(|edges| edges.iter())
7621 .filter(|edge| edge.is_stream())
7622 .map(|edge| {
7623 (Some(edge.stream_id().unwrap()), edge.from)
7627 })
7628 .collect::<Vec<_>>();
7629
7630 for input in node_inputs.into_iter() {
7631 inputs.insert(input);
7632 }
7633
7634 for edge in self.circuit.edges().dependencies_of(*node_id) {
7636 inputs.insert((None, edge.from));
7637 }
7638
7639 for edge in self.circuit.edges().depend_on(*node_id) {
7641 inputs.insert((None, edge.to));
7642 }
7643 }
7644
7645 let mut participate_in_backfill_new = BTreeSet::new();
7646
7647 let mut replay_streams = BTreeMap::new();
7648
7649 for (stream_id, mut node_id) in inputs.into_iter() {
7650 if let Some(stream_id) = stream_id
7655 && let Some(replay_source) = self.circuit.get_replay_source(stream_id)
7656 {
7657 if !need_backfill.contains(&replay_source.local_node_id()) {
7660 replay_streams.insert(stream_id, replay_source.clone());
7661 node_id = replay_source.local_node_id();
7669 }
7670 }
7671
7672 if !participate_in_backfill.contains(&node_id) {
7673 participate_in_backfill.insert(node_id);
7675 participate_in_backfill_new.insert(node_id);
7676 }
7677 }
7678
7679 for (original_stream, replay_stream) in replay_streams.into_iter() {
7681 replay_sources
7682 .entry(replay_stream.local_node_id())
7683 .or_insert_with(|| {
7684 self.circuit
7685 .add_replay_edges(original_stream, replay_stream.as_ref());
7686 replay_stream.stream_id()
7687 });
7688 }
7689
7690 Ok(participate_in_backfill_new)
7691 }
7692
7693 pub fn is_replay_complete(&self) -> bool {
7695 let Some(replay_info) = self.replay_info.as_ref() else {
7696 return true;
7697 };
7698
7699 let all_complete = replay_info.replay_sources.keys().all(|node_id| {
7703 self.circuit
7704 .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
7705 });
7706
7707 all_complete && self.is_commit_complete()
7708 }
7709
7710 pub fn complete_replay(&mut self) -> Result<(), DbspError> {
7716 let Some(replay_info) = self.replay_info.take() else {
7719 return Ok(());
7720 };
7721
7722 for (node_id, stream_id) in replay_info.replay_sources.iter() {
7724 self.circuit
7725 .map_local_node_mut(*node_id, &mut |node| node.end_replay())?;
7726 self.circuit.edges_mut().delete_stream(*stream_id);
7727 }
7728
7729 self.executor.prepare(&self.circuit, None)?;
7731
7732 Ok(())
7758 }
7759
7760 pub fn fingerprint(&self) -> u64 {
7761 let mut fip = Fingerprinter::default();
7762 let _ = self.circuit.map_nodes_recursive(&mut |node: &dyn Node| {
7763 node.fingerprint(&mut fip);
7764 Ok(())
7765 });
7766 fip.finish()
7767 }
7768
7769 pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
7779 where
7780 F: FnMut(&SchedulerEvent<'_>) + 'static,
7781 {
7782 self.circuit.register_scheduler_event_handler(name, handler);
7783 }
7784
7785 pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
7791 self.circuit.unregister_scheduler_event_handler(name)
7792 }
7793
7794 pub fn lir(&self) -> LirCircuit {
7796 (&self.circuit as &dyn CircuitBase).to_lir()
7797 }
7798
7799 pub fn set_auto_rebalance(&self, enable: bool) -> Result<(), DbspError> {
7800 self.circuit.set_auto_rebalance(enable)
7801 }
7802
7803 pub fn set_balancer_hint_by_global_id(
7804 &self,
7805 global_node_id: &GlobalNodeId,
7806 hint: BalancerHint,
7807 ) -> Result<(), DbspError> {
7808 self.circuit
7809 .set_balancer_hint_by_global_id(global_node_id, hint)
7810 }
7811
7812 pub fn set_balancer_hint(
7813 &self,
7814 persistent_id: &str,
7815 hint: BalancerHint,
7816 ) -> Result<(), DbspError> {
7817 self.circuit.set_balancer_hint(persistent_id, hint)
7818 }
7819
7820 pub fn get_current_balancer_policies(&self) -> BTreeMap<GlobalNodeId, PartitioningPolicy> {
7821 self.circuit
7822 .get_current_balancer_policies()
7823 .into_iter()
7824 .map(|(node_id, policy)| (GlobalNodeId::root().child(node_id), policy))
7825 .collect()
7826 }
7827
7828 pub fn get_current_balancer_policy(
7829 &self,
7830 persistent_id: &str,
7831 ) -> Result<PartitioningPolicy, DbspError> {
7832 self.circuit.get_current_balancer_policy(persistent_id)
7833 }
7834
7835 pub fn rebalance(&self) {
7836 self.circuit.rebalance()
7837 }
7838
7839 pub fn start_compaction(&self) {
7840 self.circuit.start_compaction()
7841 }
7842}
7843
7844pin_project! {
7845 #[derive(Debug)]
7856 pub struct Timed<T> {
7857 #[pin]
7859 task: T,
7860
7861 elapsed: Duration,
7863 }
7864}
7865
7866impl<T> Timed<T> {
7867 fn new(task: T) -> Self {
7868 Self {
7869 task,
7870 elapsed: Duration::ZERO,
7871 }
7872 }
7873}
7874
7875impl<T> Future for Timed<T>
7876where
7877 T: Future,
7878{
7879 type Output = (T::Output, Duration);
7880
7881 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
7882 let this = self.project();
7883 let start = Instant::now();
7884 let ret = this.task.poll(cx);
7885 *this.elapsed += start.elapsed();
7886 ret.map(|value| (value, take(&mut *this.elapsed)))
7887 }
7888}
7889
7890#[cfg(test)]
7891mod tests {
7892 use crate::{
7893 Circuit, Error as DbspError, RootCircuit,
7894 circuit::schedule::{DynamicScheduler, Scheduler},
7895 monitor::TraceMonitor,
7896 operator::{Generator, Z1},
7897 };
7898 use anyhow::anyhow;
7899 use std::{cell::RefCell, ops::Deref, rc::Rc, vec::Vec};
7900
7901 #[test]
7902 fn sum_circuit_dynamic() {
7903 sum_circuit::<DynamicScheduler>();
7904 }
7905 fn sum_circuit<S>()
7907 where
7908 S: Scheduler + 'static,
7909 {
7910 let actual_output: Rc<RefCell<Vec<isize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7911 let actual_output_clone = actual_output.clone();
7912 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7913 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7914 let mut n: isize = 0;
7915 let source = circuit.add_source(Generator::new(move || {
7916 let result = n;
7917 n += 1;
7918 result
7919 }));
7920 let integrator = source.integrate();
7921 integrator.inspect(|n| println!("{}", n));
7922 integrator.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7923 Ok(())
7924 })
7925 .unwrap()
7926 .0;
7927
7928 for _ in 0..100 {
7929 circuit.transaction().unwrap();
7930 }
7931
7932 let mut sum = 0;
7933 let mut expected_output: Vec<isize> = Vec::with_capacity(100);
7934 for i in 0..100 {
7935 sum += i;
7936 expected_output.push(sum);
7937 }
7938 assert_eq!(&expected_output, actual_output.borrow().deref());
7939 }
7940
7941 #[test]
7942 fn recursive_sum_circuit_dynamic() {
7943 recursive_sum_circuit::<DynamicScheduler>()
7944 }
7945
7946 fn recursive_sum_circuit<S>()
7947 where
7948 S: Scheduler + 'static,
7949 {
7950 let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7951 let actual_output_clone = actual_output.clone();
7952
7953 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7954 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7955
7956 let mut n: usize = 0;
7957 let source = circuit.add_source(Generator::new(move || {
7958 let result = n;
7959 n += 1;
7960 result
7961 }));
7962 let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
7963 let plus = source
7964 .apply2(&z1_output, |n1: &usize, n2: &usize| *n1 + *n2)
7965 .inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7966 z1_feedback.connect(&plus);
7967 Ok(())
7968 })
7969 .unwrap()
7970 .0;
7971
7972 for _ in 0..100 {
7973 circuit.transaction().unwrap();
7974 }
7975
7976 let mut sum = 0;
7977 let mut expected_output: Vec<usize> = Vec::with_capacity(100);
7978 for i in 0..100 {
7979 sum += i;
7980 expected_output.push(sum);
7981 }
7982 assert_eq!(&expected_output, actual_output.borrow().deref());
7983 }
7984
7985 #[test]
7986 fn factorial_dynamic() {
7987 factorial::<DynamicScheduler>();
7988 }
7989
7990 fn factorial<S>()
7996 where
7997 S: Scheduler + 'static,
7998 {
7999 let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
8000 let actual_output_clone = actual_output.clone();
8001
8002 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
8003 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
8004
8005 let mut n: usize = 0;
8006 let source = circuit.add_source(Generator::new(move || {
8007 n += 1;
8008 n
8009 }));
8010 let fact = circuit
8011 .iterate_with_condition_and_scheduler::<_, _, S>(|child| {
8012 let mut counter = 0;
8013 let countdown = source.delta0(child).apply_mut(move |parent_val| {
8014 if *parent_val > 0 {
8015 counter = *parent_val;
8016 };
8017 let res = counter;
8018 counter -= 1;
8019 res
8020 });
8021 let (z1_output, z1_feedback) = child.add_feedback_with_export(Z1::new(1));
8022 let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
8023 z1_feedback.connect(&mul);
8024 Ok((countdown.condition(|n| *n <= 1), z1_output.export))
8025 })
8026 .unwrap();
8027 fact.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
8028 Ok(())
8029 })
8030 .unwrap()
8031 .0;
8032
8033 for _ in 1..10 {
8034 circuit.transaction().unwrap();
8035 }
8036
8037 let mut expected_output: Vec<usize> = Vec::with_capacity(10);
8038 for i in 1..10 {
8039 expected_output.push(my_factorial(i));
8040 }
8041 assert_eq!(&expected_output, actual_output.borrow().deref());
8042 }
8043
8044 fn my_factorial(n: usize) -> usize {
8045 if n == 1 { 1 } else { n * my_factorial(n - 1) }
8046 }
8047
8048 #[test]
8051 fn open_close_region() {
8052 const REGION: &str = "my_region";
8053
8054 let monitor = TraceMonitor::new_panic_on_error();
8055 let region: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8056
8057 let _circuit = RootCircuit::build({
8058 let monitor = monitor.clone();
8059 let region = region.clone();
8060 move |circuit| {
8061 monitor.attach(circuit, "monitor");
8062
8063 let r = circuit.create_region_name(REGION, 0);
8064
8065 circuit.open_region(r.clone());
8066 let source1 = circuit.add_source(Generator::new(|| 1_i32));
8067 circuit.close_region(r.clone());
8068
8069 circuit.open_region(r.clone());
8070 let source2 = circuit.add_source(Generator::new(|| 2_i32));
8071 circuit.close_region(r.clone());
8072
8073 circuit.open_region(r.clone());
8074 source1
8075 .apply2(&source2, |a: &i32, b: &i32| a + b)
8076 .inspect(|_| {});
8077 circuit.close_region(r.clone());
8078
8079 *region.borrow_mut() = Some(r);
8080 Ok(())
8081 }
8082 })
8083 .unwrap()
8084 .0;
8085
8086 let region = region.borrow();
8088 assert_eq!(
8089 monitor.count_nodes_in_region(region.as_ref().unwrap()),
8090 Some(4)
8091 );
8092 }
8093
8094 #[test]
8098 fn separate_create_region_same_name() {
8099 const REGION: &str = "my_region";
8100
8101 let monitor = TraceMonitor::new_panic_on_error();
8102 let r1: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8104 let r2: Rc<RefCell<Option<super::RegionName>>> = Rc::new(RefCell::new(None));
8105
8106 let _circuit = RootCircuit::build({
8107 let monitor = monitor.clone();
8108 let r1 = r1.clone();
8109 let r2 = r2.clone();
8110 move |circuit| {
8111 monitor.attach(circuit, "monitor");
8112
8113 let region1 = circuit.create_region_name(REGION, 0);
8114 let region2 = circuit.create_region_name(REGION, 1);
8115
8116 circuit.open_region(region1.clone());
8117 circuit.add_source(Generator::new(|| 1_i32));
8118 circuit.close_region(region1.clone());
8119
8120 circuit.open_region(region2.clone());
8121 circuit.add_source(Generator::new(|| 2_i32));
8122 circuit.close_region(region2.clone());
8123
8124 *r1.borrow_mut() = Some(region1);
8125 *r2.borrow_mut() = Some(region2);
8126 Ok(())
8127 }
8128 })
8129 .unwrap()
8130 .0;
8131
8132 let r1 = r1.borrow();
8133 let r2 = r2.borrow();
8134 assert_eq!(monitor.count_regions_with_name(REGION), 2);
8136 assert_eq!(monitor.count_nodes_in_region(r1.as_ref().unwrap()), Some(1));
8137 assert_eq!(monitor.count_nodes_in_region(r2.as_ref().unwrap()), Some(1));
8138 }
8139
8140 #[test]
8141 fn init_circuit_constructor_error() {
8142 match RootCircuit::build(|_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
8143 Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
8144 _ => panic!(),
8145 }
8146 }
8147}