1use crate::{
26 Error as DbspError, Position, Runtime,
27 circuit::{
28 cache::{CircuitCache, CircuitStoreMarker},
29 fingerprinter::Fingerprinter,
30 metadata::OperatorMeta,
31 metrics::DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS,
32 operator_traits::{
33 BinaryOperator, BinarySinkOperator, Data, ImportOperator, NaryOperator,
34 QuaternaryOperator, SinkOperator, SourceOperator, StrictUnaryOperator, TernaryOperator,
35 TernarySinkOperator, UnaryOperator,
36 },
37 runtime::Consensus,
38 schedule::{
39 CommitProgress, DynamicScheduler, Error as SchedulerError, Executor, IterativeExecutor,
40 OnceExecutor, Scheduler,
41 },
42 trace::{CircuitEvent, SchedulerEvent},
43 },
44 circuit_cache_key,
45 ir::LABEL_MIR_NODE_ID,
46 operator::dynamic::balance::{Balancer, BalancerError, BalancerHint, PartitioningPolicy},
47 time::{Timestamp, UnitTimestamp},
48};
49#[cfg(doc)]
50use crate::{
51 InputHandle, OutputHandle,
52 algebra::{IndexedZSet, ZSet},
53 operator::{Aggregator, Fold, Generator, Max, Min, time_series::RelRange},
54 trace::Batch,
55};
56use anyhow::Error as AnyError;
57use dyn_clone::{DynClone, clone_box};
58use feldera_ir::{LirCircuit, LirNodeId};
59use feldera_storage::{FileCommitter, StoragePath};
60use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
61use std::{
62 any::{Any, TypeId, type_name_of_val},
63 borrow::Cow,
64 cell::{Ref, RefCell, RefMut},
65 collections::{BTreeMap, BTreeSet, HashMap},
66 fmt::{self, Debug, Display, Write},
67 future::Future,
68 io::ErrorKind,
69 marker::PhantomData,
70 mem::transmute,
71 ops::Deref,
72 panic::Location,
73 pin::Pin,
74 rc::Rc,
75 sync::Arc,
76 thread::panicking,
77};
78use tokio::{runtime::Runtime as TokioRuntime, task::LocalSet};
79use tracing::debug;
80use typedmap::{TypedMap, TypedMapKey};
81
82use super::dbsp_handle::Mode;
83
84const LABEL_PERSISTENT_OPERATOR_ID: &str = "persistent_id";
87
88struct StreamValue<D> {
90 val: Option<D>,
93
94 consumers: usize,
99
100 tokens: RefCell<usize>,
107}
108
109impl<D> StreamValue<D> {
110 const fn empty() -> Self {
111 Self {
112 val: None,
113 consumers: 0,
114 tokens: RefCell::new(0),
115 }
116 }
117
118 fn put(&mut self, val: D) {
119 debug_assert!(self.val.is_none());
123
124 if self.consumers > 0 {
127 self.tokens = RefCell::new(self.consumers);
128 self.val = Some(val);
129 }
130 }
131
132 fn peek<R>(this: &R) -> &D
134 where
135 R: Deref<Target = Self>,
136 {
137 debug_assert_ne!(*this.tokens.borrow(), 0);
138
139 this.val.as_ref().unwrap()
140 }
141
142 fn take(this: &RefCell<Self>) -> Option<D>
145 where
146 D: Clone,
147 {
148 let tokens = *this.borrow().tokens.borrow();
149 debug_assert_ne!(tokens, 0);
150
151 if tokens == 1 {
152 Some(this.borrow_mut().val.take().unwrap())
153 } else {
154 None
155 }
156 }
157
158 fn consume_token(this: &RefCell<Self>) {
164 let this_ref = this.borrow();
165 debug_assert_ne!(*this_ref.tokens.borrow(), 0);
166 *this_ref.tokens.borrow_mut() -= 1;
167 if *this_ref.tokens.borrow() == 0 {
168 drop(this_ref);
170 this.borrow_mut().val.take();
171 }
172 }
173}
174
175#[repr(transparent)]
176pub struct RefStreamValue<D>(Rc<RefCell<StreamValue<D>>>);
177
178impl<D> Clone for RefStreamValue<D> {
179 fn clone(&self) -> Self {
180 Self(self.0.clone())
181 }
182}
183
184impl<D> RefStreamValue<D> {
185 pub fn empty() -> Self {
186 Self(Rc::new(RefCell::new(StreamValue::empty())))
187 }
188
189 fn get_mut(&self) -> RefMut<'_, StreamValue<D>> {
190 self.0.borrow_mut()
191 }
192
193 fn get(&self) -> Ref<'_, StreamValue<D>> {
194 self.0.borrow()
195 }
196
197 pub fn put(&self, d: D) {
203 let mut val = self.get_mut();
204 val.put(d);
205 }
206
207 unsafe fn transmute<D2>(&self) -> RefStreamValue<D2> {
208 unsafe {
209 RefStreamValue(std::mem::transmute::<
210 Rc<RefCell<StreamValue<D>>>,
211 Rc<RefCell<StreamValue<D2>>>,
212 >(self.0.clone()))
213 }
214 }
215}
216
217pub trait StreamMetadata: DynClone + 'static {
222 fn stream_id(&self) -> StreamId;
223 fn local_node_id(&self) -> NodeId;
224 fn origin_node_id(&self) -> &GlobalNodeId;
225 fn num_consumers(&self) -> usize;
226
227 fn clear_consumer_count(&self);
229
230 fn register_consumer(&self);
233}
234
235dyn_clone::clone_trait_object!(StreamMetadata);
236
237pub struct Stream<C, D> {
686 stream_id: StreamId,
688 local_node_id: NodeId,
690 origin_node_id: GlobalNodeId,
692 circuit: C,
694 val: RefStreamValue<D>,
697}
698
699impl<C, D> StreamMetadata for Stream<C, D>
700where
701 C: Clone + 'static,
702 D: 'static,
703{
704 fn stream_id(&self) -> StreamId {
705 self.stream_id
706 }
707 fn local_node_id(&self) -> NodeId {
708 self.local_node_id
709 }
710 fn origin_node_id(&self) -> &GlobalNodeId {
711 &self.origin_node_id
712 }
713 fn clear_consumer_count(&self) {
714 self.val.get_mut().consumers = 0;
715 }
716 fn num_consumers(&self) -> usize {
717 self.val.get().consumers
718 }
719 fn register_consumer(&self) {
720 self.val.get_mut().consumers += 1;
721 }
722}
723
724impl<C, D> Clone for Stream<C, D>
725where
726 C: Clone,
727{
728 fn clone(&self) -> Self {
729 Self {
730 stream_id: self.stream_id,
731 local_node_id: self.local_node_id,
732 origin_node_id: self.origin_node_id.clone(),
733 circuit: self.circuit.clone(),
734 val: self.val.clone(),
735 }
736 }
737}
738
739impl<C, D> Stream<C, D>
740where
741 C: Clone,
742{
743 pub(crate) unsafe fn transmute_payload<D2>(&self) -> Stream<C, D2> {
752 unsafe {
753 Stream {
754 stream_id: self.stream_id,
755 local_node_id: self.local_node_id,
756 origin_node_id: self.origin_node_id.clone(),
757 circuit: self.circuit.clone(),
758 val: self.val.transmute::<D2>(),
759 }
760 }
761 }
762}
763
764impl<C, D> Stream<C, D> {
765 pub fn local_node_id(&self) -> NodeId {
771 self.local_node_id
772 }
773
774 pub fn origin_node_id(&self) -> &GlobalNodeId {
780 &self.origin_node_id
781 }
782
783 pub fn stream_id(&self) -> StreamId {
784 self.stream_id
785 }
786
787 pub fn circuit(&self) -> &C {
789 &self.circuit
790 }
791
792 pub fn ptr_eq<D2>(&self, other: &Stream<C, D2>) -> bool {
793 self.stream_id() == other.stream_id()
794 }
795}
796
797impl<C, D> Stream<C, D>
799where
800 C: Circuit,
801{
802 fn new(circuit: C, node_id: NodeId) -> Self {
805 Self {
806 stream_id: circuit.allocate_stream_id(),
807 local_node_id: node_id,
808 origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
809 circuit,
810 val: RefStreamValue::empty(),
811 }
812 }
813
814 pub fn with_value(circuit: C, node_id: NodeId, val: RefStreamValue<D>) -> Self {
816 Self {
817 stream_id: circuit.allocate_stream_id(),
818 local_node_id: node_id,
819 origin_node_id: GlobalNodeId::child_of(&circuit, node_id),
820 circuit,
821 val,
822 }
823 }
824
825 pub fn value(&self) -> RefStreamValue<D> {
826 self.val.clone()
827 }
828
829 pub fn export(&self) -> Stream<C::Parent, D>
837 where
838 C::Parent: Circuit,
839 D: 'static,
840 {
841 self.circuit()
842 .cache_get_or_insert_with(ExportId::new(self.stream_id()), || unimplemented!())
843 .clone()
844 }
845
846 pub fn set_label(&self, key: &str, val: &str) -> Self {
848 self.circuit.set_node_label(&self.origin_node_id, key, val);
849 self.clone()
850 }
851
852 pub fn get_label(&self, key: &str) -> Option<String> {
854 self.circuit.get_node_label(&self.origin_node_id, key)
855 }
856
857 pub fn set_persistent_id(&self, name: Option<&str>) -> Self {
859 if let Some(name) = name {
860 self.set_label(LABEL_PERSISTENT_OPERATOR_ID, name)
861 } else {
862 self.clone()
863 }
864 }
865
866 pub fn get_persistent_id(&self) -> Option<String> {
868 self.get_label(LABEL_PERSISTENT_OPERATOR_ID)
869 }
870}
871
872impl<C, D> Stream<C, D> {
873 fn with_origin(
875 circuit: C,
876 stream_id: StreamId,
877 node_id: NodeId,
878 origin_node_id: GlobalNodeId,
879 ) -> Self {
880 Self {
881 stream_id,
882 local_node_id: node_id,
883 origin_node_id,
884 circuit,
885 val: RefStreamValue::empty(),
886 }
887 }
888}
889
890impl<C, D> Stream<C, D>
891where
892 D: Clone,
893{
894 fn get(&self) -> Ref<'_, StreamValue<D>> {
895 self.val.get()
896 }
897
898 fn val(&self) -> &RefCell<StreamValue<D>> {
899 &self.val.0
900 }
901
902 fn put(&self, d: D) {
909 self.val.put(d);
910 }
911}
912
913pub struct ExportStream<C, D>
922where
923 C: Circuit,
924{
925 pub local: Stream<C, D>,
926 pub export: Stream<C::Parent, D>,
927}
928
929pub type Scope = u16;
935
936pub trait Node: Any {
939 fn local_id(&self) -> NodeId;
941
942 fn global_id(&self) -> &GlobalNodeId;
944
945 fn persistent_id(&self) -> Option<String> {
958 let worker_index = Runtime::worker_index();
959
960 match Runtime::mode() {
961 Mode::Ephemeral => Some(format!(
962 "{worker_index}-{}",
963 self.global_id().path_as_string()
964 )),
965 Mode::Persistent => self
966 .get_label(LABEL_PERSISTENT_OPERATOR_ID)
967 .map(|operator_id| format!("{worker_index}-{operator_id}")),
968 }
969 }
970
971 fn name(&self) -> Cow<'static, str>;
973
974 fn is_circuit(&self) -> bool {
975 false
976 }
977
978 fn is_input(&self) -> bool;
980
981 fn is_async(&self) -> bool;
985
986 fn ready(&self) -> bool;
990
991 fn register_ready_callback(&mut self, _cb: Box<dyn Fn() + Send + Sync>) {}
995
996 fn eval<'a>(
1000 &'a mut self,
1001 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>>;
1002
1003 fn import(&mut self) {}
1004
1005 fn start_transaction(&mut self);
1007
1008 fn flush(&mut self);
1011
1012 fn is_flush_complete(&self) -> bool;
1014
1015 fn clock_start(&mut self, scope: Scope);
1025
1026 fn clock_end(&mut self, scope: Scope);
1034
1035 fn init(&mut self) {}
1036
1037 fn metadata(&self, output: &mut OperatorMeta);
1038
1039 fn fixedpoint(&self, scope: Scope) -> bool;
1040
1041 fn map_nodes_recursive(
1044 &self,
1045 _f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1046 ) -> Result<(), DbspError> {
1047 Ok(())
1048 }
1049
1050 fn map_nodes_recursive_mut(
1053 &self,
1054 _f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1055 ) -> Result<(), DbspError> {
1056 Ok(())
1057 }
1058
1059 fn checkpoint(
1065 &mut self,
1066 base: &StoragePath,
1067 files: &mut Vec<Arc<dyn FileCommitter>>,
1068 ) -> Result<(), DbspError>;
1069
1070 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError>;
1073
1074 fn clear_state(&mut self) -> Result<(), DbspError>;
1080
1081 fn start_replay(&mut self) -> Result<(), DbspError>;
1090
1091 fn is_replay_complete(&self) -> bool;
1097
1098 fn end_replay(&mut self) -> Result<(), DbspError>;
1106
1107 fn fingerprint(&self, fip: &mut Fingerprinter) {
1109 fip.hash(type_name_of_val(self));
1110 }
1111
1112 fn set_label(&mut self, key: &str, value: &str);
1114
1115 fn get_label(&self, key: &str) -> Option<&str>;
1117
1118 fn labels(&self) -> &BTreeMap<String, String>;
1119
1120 fn map_child(&self, _path: &[NodeId], _f: &mut dyn FnMut(&dyn Node)) {
1122 panic!("map_child: not a circuit node")
1123 }
1124
1125 fn map_child_mut(&self, _path: &[NodeId], _f: &mut dyn FnMut(&mut dyn Node)) {
1127 panic!("map_child_mut: not a circuit node")
1128 }
1129
1130 fn as_circuit(&self) -> Option<&dyn CircuitBase> {
1131 None
1132 }
1133
1134 fn as_any(&self) -> &dyn Any;
1135}
1136
1137#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
1139#[repr(transparent)]
1140pub struct StreamId(usize);
1141
1142impl StreamId {
1143 pub fn new(id: usize) -> Self {
1144 Self(id)
1145 }
1146
1147 pub fn id(&self) -> usize {
1149 self.0
1150 }
1151}
1152
1153impl Display for StreamId {
1154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1155 f.write_char('s')?;
1156 Debug::fmt(&self.0, f)
1157 }
1158}
1159
1160#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
1162#[repr(transparent)]
1163pub struct NodeId(usize);
1164
1165impl NodeId {
1166 pub fn new(id: usize) -> Self {
1167 Self(id)
1168 }
1169
1170 pub fn id(&self) -> usize {
1172 self.0
1173 }
1174
1175 pub(super) fn root() -> Self {
1176 Self(0)
1177 }
1178}
1179
1180impl Display for NodeId {
1181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182 f.write_char('n')?;
1183 Debug::fmt(&self.0, f)
1184 }
1185}
1186
1187#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
1195#[repr(transparent)]
1196pub struct GlobalNodeId(Vec<NodeId>);
1197
1198impl Serialize for GlobalNodeId {
1199 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1201 where
1202 S: Serializer,
1203 {
1204 let s = self.node_identifier();
1205 serializer.serialize_str(&s)
1206 }
1207}
1208
1209impl Display for GlobalNodeId {
1210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1211 f.write_str("[")?;
1212 let path = self.path();
1213 for i in 0..path.len() {
1214 f.write_str(&path[i].0.to_string())?;
1215 if i < path.len() - 1 {
1216 f.write_str(".")?;
1217 }
1218 }
1219 f.write_str("]")
1220 }
1221}
1222
1223impl GlobalNodeId {
1224 pub fn from_path(path: &[NodeId]) -> Self {
1226 Self(path.to_owned())
1227 }
1228
1229 pub fn from_path_vec(path: Vec<NodeId>) -> Self {
1231 Self(path)
1232 }
1233
1234 pub fn root() -> Self {
1235 Self(Vec::new())
1236 }
1237
1238 pub fn child(&self, child_id: NodeId) -> Self {
1240 let mut path = Vec::with_capacity(self.path().len() + 1);
1241 for id in self.path() {
1242 path.push(*id);
1243 }
1244 path.push(child_id);
1245 Self(path)
1246 }
1247
1248 pub fn child_of<C>(circuit: &C, node_id: NodeId) -> Self
1250 where
1251 C: Circuit,
1252 {
1253 let mut ids = circuit.global_node_id().path().to_owned();
1254 ids.push(node_id);
1255 Self(ids)
1256 }
1257
1258 pub fn node_identifier(&self) -> String {
1260 let mut node_ident = "n".to_string();
1261
1262 for i in 0..self.path().len() {
1263 node_ident.push_str(&self.path()[i].to_string());
1264 if i < self.path().len() - 1 {
1265 node_ident.push('_');
1266 }
1267 }
1268 node_ident
1269 }
1270
1271 pub fn local_node_id(&self) -> Option<NodeId> {
1273 self.0.last().cloned()
1274 }
1275
1276 pub fn parent_id(&self) -> Option<Self> {
1278 self.0
1279 .split_last()
1280 .map(|(_, prefix)| GlobalNodeId::from_path(prefix))
1281 }
1282
1283 pub fn is_child_of(&self, parent: &Self) -> bool {
1285 self.parent_id().as_ref() == Some(parent)
1286 }
1287
1288 pub fn path(&self) -> &[NodeId] {
1290 &self.0
1291 }
1292
1293 pub fn top_level_ancestor(&self) -> NodeId {
1302 self.0[0]
1303 }
1304
1305 pub(crate) fn path_as_string(&self) -> String {
1307 self.0
1308 .iter()
1309 .map(|node_id| node_id.0.to_string())
1310 .collect::<Vec<_>>()
1311 .join("-")
1312 }
1313
1314 pub fn lir_node_id(&self) -> LirNodeId {
1316 LirNodeId::new(&self.path_as_string())
1317 }
1318}
1319
1320type CircuitEventHandler = Box<dyn Fn(&CircuitEvent)>;
1321type SchedulerEventHandler = Box<dyn FnMut(&SchedulerEvent<'_>)>;
1322type CircuitEventHandlers = Rc<RefCell<HashMap<String, CircuitEventHandler>>>;
1323type SchedulerEventHandlers = Rc<RefCell<HashMap<String, SchedulerEventHandler>>>;
1324
1325#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
1364#[repr(transparent)]
1365pub struct OwnershipPreference(usize);
1366
1367impl OwnershipPreference {
1368 pub const fn new(val: usize) -> Self {
1371 Self(val)
1372 }
1373
1374 pub const INDIFFERENT: Self = Self::new(0);
1376
1377 pub const WEAKLY_PREFER_OWNED: Self = Self::new(40);
1383
1384 pub const PREFER_OWNED: Self = Self::new(50);
1390
1391 pub const STRONGLY_PREFER_OWNED: Self = Self::new(100);
1394
1395 pub const fn raw(&self) -> usize {
1397 self.0
1398 }
1399}
1400
1401impl Default for OwnershipPreference {
1402 #[inline]
1403 fn default() -> Self {
1404 Self::INDIFFERENT
1405 }
1406}
1407
1408impl Display for OwnershipPreference {
1409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1410 match *self {
1411 Self::INDIFFERENT => f.write_str("Indifferent"),
1412 Self::WEAKLY_PREFER_OWNED => f.write_str("WeaklyPreferOwned"),
1413 Self::PREFER_OWNED => f.write_str("PreferOwned"),
1414 Self::STRONGLY_PREFER_OWNED => f.write_str("StronglyPreferOwned"),
1415 Self(preference) => write!(f, "Preference({preference})"),
1416 }
1417 }
1418}
1419
1420#[derive(Clone)]
1425pub struct Edge {
1426 pub from: NodeId,
1428 pub to: NodeId,
1430 pub origin: GlobalNodeId,
1434 pub stream: Option<Box<dyn StreamMetadata>>,
1437 pub ownership_preference: Option<OwnershipPreference>,
1440}
1441
1442#[allow(dead_code)]
1443impl Edge {
1444 pub(crate) fn is_dependency(&self) -> bool {
1446 self.ownership_preference.is_none()
1447 }
1448
1449 pub(crate) fn is_stream(&self) -> bool {
1451 self.stream.is_some()
1452 }
1453
1454 pub(crate) fn stream_id(&self) -> Option<StreamId> {
1455 self.stream.as_ref().map(|meta| meta.stream_id())
1456 }
1457}
1458
1459circuit_cache_key!(ExportId<C, D>(StreamId => Stream<C, D>));
1460
1461circuit_cache_key!(ReplaySource(StreamId => Box<dyn StreamMetadata>));
1464
1465pub(crate) fn register_replay_stream<C, B>(
1467 circuit: &C,
1468 stream: &Stream<C, B>,
1469 replay_stream: &Stream<C, B>,
1470) where
1471 C: Circuit,
1472 B: 'static,
1473{
1474 if TypeId::of::<()>() == TypeId::of::<C::Time>() {
1477 circuit.cache_insert(
1478 ReplaySource::new(stream.stream_id()),
1479 Box::new(replay_stream.clone()),
1480 );
1481 }
1482}
1483
1484pub trait WithClock {
1487 type Time: Timestamp;
1490
1491 fn time(&self) -> Self::Time;
1493}
1494
1495impl WithClock for () {
1499 type Time = UnitTimestamp;
1500
1501 fn time(&self) -> Self::Time {
1502 UnitTimestamp
1503 }
1504}
1505
1506impl<P, T> WithClock for ChildCircuit<P, T>
1507where
1508 P: 'static,
1509 T: Timestamp,
1510{
1511 type Time = T;
1512
1513 fn time(&self) -> Self::Time {
1514 self.time.borrow().clone()
1515 }
1516}
1517
1518#[derive(Default, Debug, Clone, Serialize, Deserialize)]
1520pub struct CircuitMetadata {
1521 metadata: HashMap<NodeId, serde_json::Value>,
1522}
1523
1524#[derive(Default, Debug)]
1525pub struct MetadataExchangeInner {
1526 local_metadata: RefCell<CircuitMetadata>,
1528
1529 global_metadata: RefCell<Vec<CircuitMetadata>>,
1532}
1533
1534#[derive(Default, Debug, Clone)]
1544pub struct MetadataExchange {
1545 inner: Rc<MetadataExchangeInner>,
1546}
1547
1548impl MetadataExchange {
1549 fn new() -> Self {
1550 Self::default()
1551 }
1552
1553 pub fn local_metadata(&self) -> CircuitMetadata {
1555 self.inner.local_metadata.borrow().clone()
1556 }
1557
1558 pub fn set_local_operator_metadata(&self, id: NodeId, metadata: serde_json::Value) {
1560 self.inner
1561 .local_metadata
1562 .borrow_mut()
1563 .metadata
1564 .insert(id, metadata.clone());
1565 }
1566
1567 pub fn clear_local_operator_metadata(&self, id: NodeId) {
1569 self.inner.local_metadata.borrow_mut().metadata.remove(&id);
1570 }
1571
1572 pub fn set_local_operator_metadata_typed<T>(&self, id: NodeId, metadata: T)
1574 where
1575 T: Serialize,
1576 {
1577 self.inner
1578 .local_metadata
1579 .borrow_mut()
1580 .metadata
1581 .insert(id, serde_json::to_value(metadata).unwrap());
1582 }
1583
1584 pub fn get_local_operator_metadata(&self, id: NodeId) -> Option<serde_json::Value> {
1586 self.inner
1587 .local_metadata
1588 .borrow()
1589 .metadata
1590 .get(&id)
1591 .cloned()
1592 }
1593
1594 pub fn get_local_operator_metadata_typed<T>(&self, id: NodeId) -> Option<T>
1595 where
1596 T: DeserializeOwned,
1597 {
1598 self.get_local_operator_metadata(id)
1599 .map(|val| serde_json::from_value::<T>(val).unwrap())
1600 }
1601
1602 pub fn set_global_metadata(&self, global_metadata: Vec<CircuitMetadata>) {
1604 *self.inner.global_metadata.borrow_mut() = global_metadata;
1605 }
1606
1607 pub fn get_global_metadata(&self) -> Vec<CircuitMetadata> {
1608 self.inner.global_metadata.borrow().clone()
1609 }
1610
1611 pub fn get_global_operator_metadata(&self, id: NodeId) -> Vec<Option<serde_json::Value>> {
1613 self.inner
1614 .global_metadata
1615 .borrow()
1616 .iter()
1617 .map(|global_metadata| global_metadata.metadata.get(&id).cloned())
1618 .collect()
1619 }
1620
1621 pub fn get_global_operator_metadata_typed<T>(&self, id: NodeId) -> Vec<Option<T>>
1628 where
1629 T: DeserializeOwned,
1630 {
1631 self.inner
1632 .global_metadata
1633 .borrow()
1634 .iter()
1635 .map(|global_metadata| {
1636 global_metadata
1637 .metadata
1638 .get(&id)
1639 .cloned()
1640 .map(|val| serde_json::from_value::<T>(val).unwrap())
1641 })
1642 .collect()
1643 }
1644}
1645
1646pub trait CircuitBase: 'static {
1648 fn edges(&self) -> Ref<'_, Edges>;
1649
1650 fn edges_mut(&self) -> RefMut<'_, Edges>;
1651
1652 fn global_id(&self) -> &GlobalNodeId;
1656
1657 fn num_nodes(&self) -> usize;
1659
1660 fn node_ids(&self) -> Vec<NodeId>;
1662
1663 fn import_nodes(&self) -> Vec<NodeId>;
1664
1665 fn clear(&mut self);
1666
1667 fn add_dependency(&self, from: NodeId, to: NodeId);
1672
1673 fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>>;
1676
1677 fn allocate_stream_id(&self) -> StreamId;
1680
1681 fn last_stream_id(&self) -> RefCell<StreamId>;
1683
1684 fn root_scope(&self) -> Scope;
1689
1690 fn node_id(&self) -> NodeId;
1692
1693 fn global_node_id(&self) -> GlobalNodeId;
1695
1696 fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node));
1698
1699 fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node));
1701
1702 fn map_nodes_recursive(
1706 &self,
1707 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1708 ) -> Result<(), DbspError>;
1709
1710 fn map_nodes_recursive_mut(
1714 &mut self,
1715 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1716 ) -> Result<(), DbspError>;
1717
1718 fn map_local_nodes(
1722 &self,
1723 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
1724 ) -> Result<(), DbspError>;
1725
1726 fn map_local_nodes_mut(
1728 &self,
1729 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
1730 ) -> Result<(), DbspError>;
1731
1732 fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node));
1738
1739 fn map_subcircuits(
1743 &self,
1744 f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
1745 ) -> Result<(), DbspError>;
1746
1747 fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str);
1754
1755 fn set_persistent_node_id(&self, id: &GlobalNodeId, persistent_id: Option<&str>) {
1756 if let Some(persistent_id) = persistent_id {
1757 self.set_node_label(id, LABEL_PERSISTENT_OPERATOR_ID, persistent_id);
1758 }
1759 }
1760
1761 fn set_mir_node_id(&self, id: &GlobalNodeId, mir_id: Option<&str>) {
1762 if let Some(mir_id) = mir_id {
1763 self.set_node_label(id, LABEL_MIR_NODE_ID, mir_id);
1764 }
1765 }
1766
1767 fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String>;
1774
1775 fn get_persistent_node_id(&self, id: &GlobalNodeId) -> Option<String> {
1777 self.get_node_label(id, LABEL_PERSISTENT_OPERATOR_ID)
1778 }
1779
1780 fn check_fixedpoint(&self, scope: Scope) -> bool;
1781
1782 fn notify_start_transaction(&self) {
1783 let _ = self.map_local_nodes_mut(&mut |node| {
1784 node.start_transaction();
1785 Ok(())
1786 });
1787 }
1788
1789 fn metadata_exchange(&self) -> &MetadataExchange;
1791
1792 fn balancer(&self) -> &Balancer;
1794
1795 fn set_balancer_hint(
1802 &self,
1803 global_node_id: &GlobalNodeId,
1804 hint: BalancerHint,
1805 ) -> Result<(), DbspError>;
1806
1807 fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy>;
1809}
1810
1811pub trait Circuit: CircuitBase + Clone + WithClock {
1824 type Parent;
1826
1827 fn parent(&self) -> Self::Parent;
1829
1830 fn root_circuit(&self) -> RootCircuit;
1832
1833 fn ptr_eq(this: &Self, other: &Self) -> bool;
1835
1836 fn circuit_event_handlers(&self) -> CircuitEventHandlers;
1838
1839 fn scheduler_event_handlers(&self) -> SchedulerEventHandlers;
1841
1842 fn log_circuit_event(&self, event: &CircuitEvent);
1844
1845 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>);
1847
1848 fn map_node<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> T) -> T;
1855
1856 fn map_node_mut<T>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1863
1864 fn map_local_node_mut<T>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> T) -> T;
1870
1871 fn cache_get_or_insert_with<K, F>(&self, key: K, f: F) -> RefMut<'_, K::Value>
1876 where
1877 K: 'static + TypedMapKey<CircuitStoreMarker>,
1878 F: FnMut() -> K::Value;
1879
1880 fn tick(&self);
1883
1884 fn clock_start(&self, scope: Scope);
1886
1887 fn clock_end(&self, scope: Scope);
1889
1890 fn ready(&self, id: NodeId) -> bool;
1893
1894 fn cache_insert<K>(&self, key: K, val: K::Value)
1899 where
1900 K: TypedMapKey<CircuitStoreMarker> + 'static;
1901
1902 fn cache_contains<K>(&self, key: &K) -> bool
1903 where
1904 K: TypedMapKey<CircuitStoreMarker> + 'static;
1905
1906 fn cache_get<K>(&self, key: &K) -> Option<K::Value>
1907 where
1908 K: TypedMapKey<CircuitStoreMarker> + 'static,
1909 K::Value: Clone;
1910
1911 fn get_replay_source(&self, stream_id: StreamId) -> Option<Box<dyn StreamMetadata>> {
1914 self.cache_get(&ReplaySource::new(stream_id))
1915 }
1916
1917 fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata);
1920
1921 fn connect_stream<T: 'static>(
1923 &self,
1924 stream: &Stream<Self, T>,
1925 to: NodeId,
1926 ownership_preference: OwnershipPreference,
1927 );
1928
1929 fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>);
1930
1931 fn is_async_node(&self, id: NodeId) -> bool;
1932
1933 fn eval_node(
1937 &self,
1938 id: NodeId,
1939 ) -> impl Future<Output = Result<Option<Position>, SchedulerError>>;
1940
1941 fn eval_import_node(&self, id: NodeId);
1943
1944 fn flush_node(&self, id: NodeId);
1945
1946 fn is_flush_complete(&self, id: NodeId) -> bool;
1947
1948 #[track_caller]
1956 fn region<F, T>(&self, name: &str, f: F) -> T
1957 where
1958 F: FnOnce() -> T;
1959
1960 fn add_preprocessor(&self, preprocessor_node_id: NodeId);
1964
1965 fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
1967 where
1968 O: Data,
1969 Op: SourceOperator<O>;
1970
1971 fn add_exchange<I, SndOp, O, RcvOp>(
1999 &self,
2000 sender: SndOp,
2001 receiver: RcvOp,
2002 input_stream: &Stream<Self, I>,
2003 ) -> Stream<Self, O>
2004 where
2005 I: Data,
2006 O: Data,
2007 SndOp: SinkOperator<I>,
2008 RcvOp: SourceOperator<O>;
2009
2010 fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
2013 &self,
2014 sender: SndOp,
2015 receiver: RcvOp,
2016 input_stream: &Stream<Self, I>,
2017 input_preference: OwnershipPreference,
2018 ) -> Stream<Self, O>
2019 where
2020 I: Data,
2021 O: Data,
2022 SndOp: SinkOperator<I>,
2023 RcvOp: SourceOperator<O>;
2024
2025 fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
2027 where
2028 I: Data,
2029 Op: SinkOperator<I>;
2030
2031 fn add_sink_with_preference<I, Op>(
2034 &self,
2035 operator: Op,
2036 input_stream: &Stream<Self, I>,
2037 input_preference: OwnershipPreference,
2038 ) -> GlobalNodeId
2039 where
2040 I: Data,
2041 Op: SinkOperator<I>;
2042
2043 fn add_binary_sink<I1, I2, Op>(
2045 &self,
2046 operator: Op,
2047 input_stream1: &Stream<Self, I1>,
2048 input_stream2: &Stream<Self, I2>,
2049 ) where
2050 I1: Data,
2051 I2: Data,
2052 Op: BinarySinkOperator<I1, I2>;
2053
2054 fn add_binary_sink_with_preference<I1, I2, Op>(
2058 &self,
2059 operator: Op,
2060 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2061 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2062 ) where
2063 I1: Data,
2064 I2: Data,
2065 Op: BinarySinkOperator<I1, I2>;
2066
2067 fn add_ternary_sink<I1, I2, I3, Op>(
2069 &self,
2070 operator: Op,
2071 input_stream1: &Stream<Self, I1>,
2072 input_stream2: &Stream<Self, I2>,
2073 input_stream3: &Stream<Self, I3>,
2074 ) -> GlobalNodeId
2075 where
2076 I1: Data,
2077 I2: Data,
2078 I3: Data,
2079 Op: TernarySinkOperator<I1, I2, I3>;
2080
2081 fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
2084 &self,
2085 operator: Op,
2086 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2087 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2088 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2089 ) -> GlobalNodeId
2090 where
2091 I1: Data,
2092 I2: Data,
2093 I3: Data,
2094 Op: TernarySinkOperator<I1, I2, I3>;
2095
2096 fn add_unary_operator<I, O, Op>(
2098 &self,
2099 operator: Op,
2100 input_stream: &Stream<Self, I>,
2101 ) -> Stream<Self, O>
2102 where
2103 I: Data,
2104 O: Data,
2105 Op: UnaryOperator<I, O>;
2106
2107 fn add_unary_operator_with_preference<I, O, Op>(
2110 &self,
2111 operator: Op,
2112 input_stream: &Stream<Self, I>,
2113 input_preference: OwnershipPreference,
2114 ) -> Stream<Self, O>
2115 where
2116 I: Data,
2117 O: Data,
2118 Op: UnaryOperator<I, O>;
2119
2120 fn add_binary_operator<I1, I2, O, Op>(
2122 &self,
2123 operator: Op,
2124 input_stream1: &Stream<Self, I1>,
2125 input_stream2: &Stream<Self, I2>,
2126 ) -> Stream<Self, O>
2127 where
2128 I1: Data,
2129 I2: Data,
2130 O: Data,
2131 Op: BinaryOperator<I1, I2, O>;
2132
2133 fn add_binary_operator_with_preference<I1, I2, O, Op>(
2137 &self,
2138 operator: Op,
2139 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2140 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2141 ) -> Stream<Self, O>
2142 where
2143 I1: Data,
2144 I2: Data,
2145 O: Data,
2146 Op: BinaryOperator<I1, I2, O>;
2147
2148 fn add_ternary_operator<I1, I2, I3, O, Op>(
2150 &self,
2151 operator: Op,
2152 input_stream1: &Stream<Self, I1>,
2153 input_stream2: &Stream<Self, I2>,
2154 input_stream3: &Stream<Self, I3>,
2155 ) -> Stream<Self, O>
2156 where
2157 I1: Data,
2158 I2: Data,
2159 I3: Data,
2160 O: Data,
2161 Op: TernaryOperator<I1, I2, I3, O>;
2162
2163 #[allow(clippy::too_many_arguments)]
2166 fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
2167 &self,
2168 operator: Op,
2169 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2170 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2171 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2172 ) -> Stream<Self, O>
2173 where
2174 I1: Data,
2175 I2: Data,
2176 I3: Data,
2177 O: Data,
2178 Op: TernaryOperator<I1, I2, I3, O>;
2179
2180 fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
2182 &self,
2183 operator: Op,
2184 input_stream1: &Stream<Self, I1>,
2185 input_stream2: &Stream<Self, I2>,
2186 input_stream3: &Stream<Self, I3>,
2187 input_stream4: &Stream<Self, I4>,
2188 ) -> Stream<Self, O>
2189 where
2190 I1: Data,
2191 I2: Data,
2192 I3: Data,
2193 I4: Data,
2194 O: Data,
2195 Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2196
2197 #[allow(clippy::too_many_arguments)]
2200 fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
2201 &self,
2202 operator: Op,
2203 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
2204 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
2205 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
2206 input_stream4: (&Stream<Self, I4>, OwnershipPreference),
2207 ) -> Stream<Self, O>
2208 where
2209 I1: Data,
2210 I2: Data,
2211 I3: Data,
2212 I4: Data,
2213 O: Data,
2214 Op: QuaternaryOperator<I1, I2, I3, I4, O>;
2215
2216 fn add_nary_operator<'a, I, O, Op, Iter>(
2218 &'a self,
2219 operator: Op,
2220 input_streams: Iter,
2221 ) -> Stream<Self, O>
2222 where
2223 I: Data,
2224 O: Data,
2225 Op: NaryOperator<I, O>,
2226 Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2227
2228 fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
2231 &'a self,
2232 operator: Op,
2233 input_streams: Iter,
2234 input_preference: OwnershipPreference,
2235 ) -> Stream<Self, O>
2236 where
2237 I: Data,
2238 O: Data,
2239 Op: NaryOperator<I, O>,
2240 Iter: IntoIterator<Item = &'a Stream<Self, I>>;
2241
2242 fn add_feedback<I, O, Op>(
2292 &self,
2293 operator: Op,
2294 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2295 where
2296 I: Data,
2297 O: Data,
2298 Op: StrictUnaryOperator<I, O>;
2299
2300 fn add_feedback_persistent<I, O, Op>(
2302 &self,
2303 persistent_id: Option<&str>,
2304 operator: Op,
2305 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2306 where
2307 I: Data,
2308 O: Data,
2309 Op: StrictUnaryOperator<I, O>,
2310 {
2311 let (output, feedback) = self.add_feedback(operator);
2312
2313 output.set_persistent_id(persistent_id);
2314
2315 (output, feedback)
2316 }
2317
2318 fn add_feedback_with_export<I, O, Op>(
2332 &self,
2333 operator: Op,
2334 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2335 where
2336 I: Data,
2337 O: Data,
2338 Op: StrictUnaryOperator<I, O>;
2339
2340 fn add_feedback_with_export_persistent<I, O, Op>(
2342 &self,
2343 persistent_id: Option<&str>,
2344 operator: Op,
2345 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
2346 where
2347 I: Data,
2348 O: Data,
2349 Op: StrictUnaryOperator<I, O>,
2350 {
2351 let (export, feedback) = self.add_feedback_with_export(operator);
2352
2353 export.local.set_persistent_id(persistent_id);
2354
2355 (export, feedback)
2356 }
2357
2358 fn connect_feedback_with_preference<I, O, Op>(
2359 &self,
2360 output_node_id: NodeId,
2361 operator: Rc<RefCell<Op>>,
2362 input_stream: &Stream<Self, I>,
2363 input_preference: OwnershipPreference,
2364 ) where
2365 I: Data,
2366 O: Data,
2367 Op: StrictUnaryOperator<I, O>;
2368
2369 fn iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2383 where
2384 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2385 E: Executor<IterativeCircuit<Self>>;
2386
2387 fn non_iterative_subcircuit<F, T, E>(&self, child_constructor: F) -> Result<T, SchedulerError>
2390 where
2391 F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(T, E), SchedulerError>,
2392 E: Executor<NonIterativeCircuit<Self>>;
2393
2394 fn iterate<F, C, T>(&self, constructor: F) -> Result<T, SchedulerError>
2465 where
2466 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2467 C: AsyncFn() -> Result<bool, SchedulerError> + 'static;
2468
2469 fn iterate_with_scheduler<F, C, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2474 where
2475 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, T), SchedulerError>,
2476 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
2477 S: Scheduler + 'static;
2478
2479 fn fixedpoint<F, T>(&self, constructor: F) -> Result<T, SchedulerError>
2515 where
2516 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>;
2517
2518 fn fixedpoint_with_scheduler<F, T, S>(&self, constructor: F) -> Result<T, SchedulerError>
2523 where
2524 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<T, SchedulerError>,
2525 S: Scheduler + 'static;
2526
2527 fn import_stream<I, O, Op>(
2532 &self,
2533 operator: Op,
2534 parent_stream: &Stream<Self::Parent, I>,
2535 ) -> Stream<Self, O>
2536 where
2537 Self::Parent: Circuit,
2538 I: Data,
2539 O: Data,
2540 Op: ImportOperator<I, O>;
2541
2542 fn import_stream_with_preference<I, O, Op>(
2545 &self,
2546 operator: Op,
2547 parent_stream: &Stream<Self::Parent, I>,
2548 input_preference: OwnershipPreference,
2549 ) -> Stream<Self, O>
2550 where
2551 Self::Parent: Circuit,
2552 I: Data,
2553 O: Data,
2554 Op: ImportOperator<I, O>;
2555}
2556
2557pub struct Edges {
2559 by_source: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2560 by_destination: BTreeMap<NodeId, Vec<Rc<Edge>>>,
2561 by_stream: BTreeMap<Option<StreamId>, Vec<Rc<Edge>>>,
2562}
2563
2564impl Edges {
2565 fn new() -> Self {
2566 Self {
2567 by_source: BTreeMap::new(),
2568 by_destination: BTreeMap::new(),
2569 by_stream: BTreeMap::new(),
2570 }
2571 }
2572
2573 fn add_edge(&mut self, edge: Edge) {
2574 let edge = Rc::new(edge);
2575
2576 self.by_source
2577 .entry(edge.from)
2578 .or_default()
2579 .push(edge.clone());
2580 self.by_destination
2581 .entry(edge.to)
2582 .or_default()
2583 .push(edge.clone());
2584
2585 self.by_stream
2586 .entry(edge.stream.as_ref().map(|s| s.stream_id()))
2587 .or_default()
2588 .push(edge);
2589 }
2590
2591 fn extend<I>(&mut self, edges: I)
2592 where
2593 I: IntoIterator<Item = Edge>,
2594 {
2595 for edge in edges {
2596 self.add_edge(edge)
2597 }
2598 }
2599
2600 pub(crate) fn iter(&self) -> impl Iterator<Item = &Edge> {
2601 self.by_source
2602 .values()
2603 .flat_map(|edges| edges.iter().map(|edge| edge.as_ref()))
2604 }
2605
2606 pub(crate) fn get_by_stream_id(&self, stream_id: &Option<StreamId>) -> Option<&[Rc<Edge>]> {
2607 self.by_stream.get(stream_id).map(|v| v.as_slice())
2608 }
2609
2610 fn delete_stream(&mut self, stream_id: StreamId) {
2611 if let Some(edges) = self.by_stream.remove(&Some(stream_id)) {
2612 for edge in edges {
2613 if let Some(v) = self.by_source.get_mut(&edge.from) {
2614 v.retain(|e| e.stream_id() != Some(stream_id))
2615 }
2616 if let Some(v) = self.by_destination.get_mut(&edge.to) {
2617 v.retain(|e| e.stream_id() != Some(stream_id))
2618 }
2619 }
2620 }
2621 }
2622
2623 pub(crate) fn inputs_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2624 self.by_destination
2625 .get(&node_id)
2626 .into_iter()
2627 .flatten()
2628 .map(|edge| edge.as_ref())
2629 }
2630
2631 pub(crate) fn depend_on(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2635 self.by_source.get(&node_id).into_iter().flat_map(|edges| {
2636 edges.iter().filter_map(|edge| {
2637 if edge.is_dependency() {
2638 Some(edge.as_ref())
2639 } else {
2640 None
2641 }
2642 })
2643 })
2644 }
2645
2646 pub(crate) fn dependencies_of(&self, node_id: NodeId) -> impl Iterator<Item = &Edge> {
2650 self.by_destination
2651 .get(&node_id)
2652 .into_iter()
2653 .flat_map(|edges| {
2654 edges.iter().filter_map(|edge| {
2655 if edge.is_dependency() {
2656 Some(edge.as_ref())
2657 } else {
2658 None
2659 }
2660 })
2661 })
2662 }
2663
2664 fn clear(&mut self) {
2665 *self = Self::new();
2666 }
2667}
2668
2669struct CircuitInner<P>
2673where
2674 P: 'static,
2675{
2676 parent: P,
2677
2678 root: Option<RootCircuit>,
2680
2681 root_scope: Scope,
2682
2683 node_id: NodeId,
2685 global_node_id: GlobalNodeId,
2686 nodes: RefCell<Vec<RefCell<Box<dyn Node>>>>,
2687 edges: RefCell<Edges>,
2688 import_nodes: RefCell<Vec<NodeId>>,
2689 circuit_event_handlers: CircuitEventHandlers,
2690 scheduler_event_handlers: SchedulerEventHandlers,
2691 store: RefCell<CircuitCache>,
2692 last_stream_id: RefCell<StreamId>,
2693 metadata_exchange: MetadataExchange,
2694 balancer: Rc<Balancer>,
2695}
2696
2697impl<P> CircuitInner<P>
2698where
2699 P: 'static,
2700{
2701 #[allow(clippy::too_many_arguments)]
2702 fn new(
2703 parent: P,
2704 root: Option<RootCircuit>,
2705 root_scope: Scope,
2706 node_id: NodeId,
2707 global_node_id: GlobalNodeId,
2708 circuit_event_handlers: CircuitEventHandlers,
2709 scheduler_event_handlers: SchedulerEventHandlers,
2710 last_stream_id: RefCell<StreamId>,
2711 ) -> Self {
2712 let metadata_exchange = MetadataExchange::new();
2713
2714 Self {
2715 parent,
2716 root,
2717 root_scope,
2718 node_id,
2719 global_node_id,
2720 nodes: RefCell::new(Vec::new()),
2721 edges: RefCell::new(Edges::new()),
2722 import_nodes: RefCell::new(Vec::new()),
2723 circuit_event_handlers,
2724 scheduler_event_handlers,
2725 store: RefCell::new(TypedMap::new()),
2726 last_stream_id,
2727 metadata_exchange: metadata_exchange.clone(),
2728 balancer: Rc::new(Balancer::new(&metadata_exchange)),
2729 }
2730 }
2731
2732 fn add_edge(&self, edge: Edge) {
2733 self.edges.borrow_mut().add_edge(edge);
2734 }
2735
2736 fn add_node<N>(&self, mut node: N)
2737 where
2738 N: Node + 'static,
2739 {
2740 node.init();
2741 self.nodes
2742 .borrow_mut()
2743 .push(RefCell::new(Box::new(node) as Box<dyn Node>));
2744 }
2745
2746 fn add_import_node(&self, node_id: NodeId) {
2747 self.import_nodes.borrow_mut().push(node_id);
2748 }
2749
2750 fn import_nodes(&self) -> Vec<NodeId> {
2751 self.import_nodes.borrow().clone()
2752 }
2753
2754 fn clear(&self) {
2755 self.nodes.borrow_mut().clear();
2756 self.edges.borrow_mut().clear();
2757 self.store.borrow_mut().clear();
2758 }
2759
2760 fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
2761 where
2762 F: Fn(&CircuitEvent) + 'static,
2763 {
2764 self.circuit_event_handlers.borrow_mut().insert(
2765 name.to_string(),
2766 Box::new(handler) as Box<dyn Fn(&CircuitEvent)>,
2767 );
2768 }
2769
2770 fn unregister_circuit_event_handler(&self, name: &str) -> bool {
2771 self.circuit_event_handlers
2772 .borrow_mut()
2773 .remove(name)
2774 .is_some()
2775 }
2776
2777 fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
2778 where
2779 F: FnMut(&SchedulerEvent<'_>) + 'static,
2780 {
2781 self.scheduler_event_handlers.borrow_mut().insert(
2782 name.to_string(),
2783 Box::new(handler) as Box<dyn FnMut(&SchedulerEvent<'_>)>,
2784 );
2785 }
2786
2787 fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
2788 self.scheduler_event_handlers
2789 .borrow_mut()
2790 .remove(name)
2791 .is_some()
2792 }
2793
2794 fn log_circuit_event(&self, event: &CircuitEvent) {
2795 for (_, handler) in self.circuit_event_handlers.borrow().iter() {
2796 handler(event)
2797 }
2798 }
2799
2800 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
2801 for (_, handler) in self.scheduler_event_handlers.borrow_mut().iter_mut() {
2802 handler(event)
2803 }
2804 }
2805
2806 fn check_fixedpoint(&self, scope: Scope) -> bool {
2807 self.nodes.borrow().iter().all(|node| {
2808 node.borrow().fixedpoint(scope)
2816 })
2817 }
2818}
2819
2820pub struct ChildCircuit<P, T>
2826where
2827 P: 'static,
2828 T: Timestamp,
2829{
2830 inner: Rc<CircuitInner<P>>,
2831 time: Rc<RefCell<T>>,
2832}
2833
2834pub type RootCircuit = ChildCircuit<(), ()>;
2849
2850pub type NestedCircuit = ChildCircuit<RootCircuit, <() as Timestamp>::Nested>;
2851
2852pub type IterativeCircuit<P> = ChildCircuit<P, <<P as WithClock>::Time as Timestamp>::Nested>;
2854
2855pub type NonIterativeCircuit<P> = ChildCircuit<P, <P as WithClock>::Time>;
2857
2858impl<P, T> Clone for ChildCircuit<P, T>
2859where
2860 P: 'static,
2861 T: Timestamp,
2862{
2863 fn clone(&self) -> Self {
2864 Self {
2865 inner: self.inner.clone(),
2866 time: self.time.clone(),
2867 }
2868 }
2869}
2870
2871impl<P, T> ChildCircuit<P, T>
2872where
2873 P: 'static,
2874 T: Timestamp,
2875{
2876 fn inner(&self) -> &CircuitInner<P> {
2878 &self.inner
2879 }
2880}
2881
2882impl RootCircuit {
2883 pub fn build<F, T>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
2927 where
2928 F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
2929 {
2930 Self::build_with_scheduler::<F, T, DynamicScheduler>(constructor)
2931 }
2932
2933 pub fn build_with_scheduler<F, T, S>(constructor: F) -> Result<(CircuitHandle, T), DbspError>
2939 where
2940 F: FnOnce(&mut RootCircuit) -> Result<T, AnyError>,
2941 S: Scheduler + 'static,
2942 {
2943 let tokio_runtime = tokio::runtime::Builder::new_current_thread()
2947 .build()
2948 .map_err(|e| {
2949 DbspError::Scheduler(SchedulerError::TokioError {
2950 error: e.to_string(),
2951 })
2952 })?;
2953
2954 let mut circuit = RootCircuit::new();
2955 let res = constructor(&mut circuit).map_err(DbspError::Constructor)?;
2956 let mut executor = Box::new(<OnceExecutor<S>>::new()) as Box<dyn Executor<RootCircuit>>;
2957 executor.prepare(&circuit, None)?;
2958
2959 circuit.log_scheduler_event(&SchedulerEvent::clock_start());
2996 circuit.clock_start(0);
2997 Ok((
2998 CircuitHandle {
2999 circuit,
3000 executor,
3001 tokio_runtime,
3002 replay_info: None,
3003 },
3004 res,
3005 ))
3006 }
3007}
3008
3009impl RootCircuit {
3010 fn new() -> Self {
3013 Self {
3014 inner: Rc::new(CircuitInner::new(
3015 (),
3016 None,
3017 0,
3018 NodeId::root(),
3019 GlobalNodeId::root(),
3020 Rc::new(RefCell::new(HashMap::new())),
3021 Rc::new(RefCell::new(HashMap::new())),
3022 RefCell::new(StreamId::new(0)),
3023 )),
3024 time: Rc::new(RefCell::new(())),
3025 }
3026 }
3027}
3028
3029impl RootCircuit {
3030 pub fn register_circuit_event_handler<F>(&self, name: &str, handler: F)
3050 where
3051 F: Fn(&CircuitEvent) + 'static,
3052 {
3053 self.inner().register_circuit_event_handler(name, handler);
3054 }
3055
3056 pub fn unregister_circuit_event_handler(&self, name: &str) -> bool {
3059 self.inner().unregister_circuit_event_handler(name)
3060 }
3061
3062 pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
3077 where
3078 F: FnMut(&SchedulerEvent<'_>) + 'static,
3079 {
3080 self.inner().register_scheduler_event_handler(name, handler);
3081 }
3082
3083 pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
3086 self.inner().unregister_scheduler_event_handler(name)
3087 }
3088}
3089
3090impl<P, T> ChildCircuit<P, T>
3091where
3092 P: Circuit,
3093 T: Timestamp,
3094{
3095 fn with_parent(parent: P, id: NodeId) -> Self {
3097 let global_node_id = parent.global_node_id().child(id);
3098 let circuit_handlers = parent.circuit_event_handlers();
3099 let sched_handlers = parent.scheduler_event_handlers();
3100 let root_scope = parent.root_scope() + 1;
3101 let last_stream_id = parent.last_stream_id();
3102
3103 let root = parent.root_circuit();
3104
3105 ChildCircuit {
3106 inner: Rc::new(CircuitInner::new(
3107 parent,
3108 Some(root),
3109 root_scope,
3110 id,
3111 global_node_id,
3112 circuit_handlers,
3113 sched_handlers,
3114 last_stream_id,
3115 )),
3116 time: Rc::new(RefCell::new(Timestamp::clock_start())),
3117 }
3118 }
3119
3120 pub fn is_child_of(&self, other: &P) -> bool {
3122 P::ptr_eq(&self.inner().parent, other)
3123 }
3124}
3125
3126impl<P, T> ChildCircuit<P, T>
3128where
3129 P: 'static,
3130 T: Timestamp,
3131 Self: Circuit,
3132{
3133 fn node_id(&self) -> NodeId {
3135 self.inner().node_id
3136 }
3137
3138 fn add_node<F, N, V>(&self, f: F) -> V
3144 where
3145 F: FnOnce(NodeId) -> (N, V),
3146 N: Node + 'static,
3147 {
3148 let id = self.inner().nodes.borrow().len();
3149
3150 let (node, res) = f(NodeId(id));
3153 self.inner().add_node(node);
3154 res
3155 }
3156
3157 fn add_import_node(&self, node_id: NodeId) {
3158 self.inner().add_import_node(node_id);
3159 }
3160
3161 fn try_add_node<F, N, V, E>(&self, f: F) -> Result<V, E>
3163 where
3164 F: FnOnce(NodeId) -> Result<(N, V), E>,
3165 N: Node + 'static,
3166 {
3167 let id = self.inner().nodes.borrow().len();
3168
3169 let (node, res) = f(NodeId(id))?;
3172 self.inner().add_node(node);
3173 Ok(res)
3174 }
3175
3176 fn log_circuit_event(&self, event: &CircuitEvent) {
3179 self.inner().log_circuit_event(event);
3180 }
3181
3182 pub(super) fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3185 self.inner().log_scheduler_event(event);
3186 }
3187}
3188
3189impl<P, T> CircuitBase for ChildCircuit<P, T>
3190where
3191 P: Clone + 'static,
3192 T: Timestamp,
3193{
3194 fn edges(&self) -> Ref<'_, Edges> {
3195 self.inner().edges.borrow()
3196 }
3197
3198 fn transitive_ancestors(&self) -> BTreeMap<NodeId, BTreeSet<NodeId>> {
3199 let edges = self.edges();
3200 let mut result = BTreeMap::new();
3201
3202 for node_id in self.node_ids() {
3204 let mut ancestors = BTreeSet::new();
3205 let mut queue = vec![node_id];
3206
3207 while let Some(current) = queue.pop() {
3209 for edge in edges.inputs_of(current) {
3210 let ancestor_node = edge.from;
3211 if ancestors.insert(ancestor_node) {
3212 queue.push(ancestor_node);
3213 }
3214 }
3215 }
3216
3217 result.insert(node_id, ancestors);
3218 }
3219
3220 result
3221 }
3222
3223 fn edges_mut(&self) -> RefMut<'_, Edges> {
3224 self.inner().edges.borrow_mut()
3225 }
3226
3227 fn num_nodes(&self) -> usize {
3228 self.inner().nodes.borrow().len()
3229 }
3230
3231 fn clear(&mut self) {
3232 self.inner().clear();
3233 }
3234
3235 fn add_dependency(&self, from: NodeId, to: NodeId) {
3236 self.log_circuit_event(&CircuitEvent::dependency(
3237 self.global_node_id().child(from),
3238 self.global_node_id().child(to),
3239 ));
3240
3241 let origin = self.global_node_id().child(from);
3242 self.inner().add_edge(Edge {
3243 from,
3244 to,
3245 origin,
3246 stream: None,
3247 ownership_preference: None,
3248 });
3249 }
3250
3251 fn map_node_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
3253 let nodes = self.inner().nodes.borrow();
3254 let node = nodes[path[0].0].borrow();
3255 if path.len() == 1 {
3256 f(node.as_ref())
3257 } else {
3258 node.map_child(&path[1..], &mut |node| f(node));
3259 }
3260 }
3261
3262 fn map_node_mut_relative(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
3264 let nodes = self.inner().nodes.borrow();
3265 let mut node = nodes[path[0].0].borrow_mut();
3266 if path.len() == 1 {
3267 f(node.as_mut())
3268 } else {
3269 node.map_child_mut(&path[1..], &mut |node| f(node));
3270 }
3271 }
3272
3273 fn map_nodes_recursive(
3274 &self,
3275 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3276 ) -> Result<(), DbspError> {
3277 for node in self.inner().nodes.borrow().iter() {
3278 f(node.borrow().as_ref())?;
3279 node.borrow().map_nodes_recursive(f)?;
3280 }
3281 Ok(())
3282 }
3283
3284 fn map_nodes_recursive_mut(
3285 &mut self,
3286 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3287 ) -> Result<(), DbspError> {
3288 for node in self.inner().nodes.borrow_mut().iter_mut() {
3289 f(node.borrow_mut().as_mut())?;
3290 node.borrow_mut().map_nodes_recursive_mut(f)?;
3291 }
3292
3293 Ok(())
3294 }
3295
3296 fn map_local_nodes(
3297 &self,
3298 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
3299 ) -> Result<(), DbspError> {
3300 for node in self.inner().nodes.borrow().iter() {
3301 f(node.borrow().as_ref())?;
3302 }
3303 Ok(())
3304 }
3305
3306 fn map_local_nodes_mut(
3307 &self,
3308 f: &mut dyn FnMut(&mut dyn Node) -> Result<(), DbspError>,
3309 ) -> Result<(), DbspError> {
3310 for node in self.inner().nodes.borrow_mut().iter_mut() {
3311 f(node.borrow_mut().as_mut())?;
3312 }
3313
3314 Ok(())
3315 }
3316
3317 fn apply_local_node_mut(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node)) {
3318 self.map_node_mut_relative(&[id], &mut |node| f(node));
3319 }
3320
3321 fn map_subcircuits(
3322 &self,
3323 f: &mut dyn FnMut(&dyn CircuitBase) -> Result<(), DbspError>,
3324 ) -> Result<(), DbspError> {
3325 for node in self.inner().nodes.borrow().iter() {
3326 let node = node.borrow();
3327 if let Some(child_circuit) = node.as_circuit() {
3328 f(child_circuit)?;
3329 }
3330 }
3331 Ok(())
3332 }
3333
3334 fn set_node_label(&self, id: &GlobalNodeId, key: &str, val: &str) {
3335 self.map_node_mut(id, &mut |node| node.set_label(key, val));
3336 }
3337
3338 fn get_node_label(&self, id: &GlobalNodeId, key: &str) -> Option<String> {
3339 self.map_node(id, &mut |node| node.get_label(key).map(str::to_string))
3340 }
3341
3342 fn global_id(&self) -> &GlobalNodeId {
3343 &self.inner().global_node_id
3344 }
3345
3346 fn node_ids(&self) -> Vec<NodeId> {
3348 self.inner()
3349 .nodes
3350 .borrow()
3351 .iter()
3352 .map(|node| node.borrow().local_id())
3353 .collect()
3354 }
3355
3356 fn import_nodes(&self) -> Vec<NodeId> {
3357 self.inner().import_nodes()
3358 }
3359
3360 fn allocate_stream_id(&self) -> StreamId {
3361 let circuit = self.inner();
3362 let mut last_stream_id = circuit.last_stream_id.borrow_mut();
3363 last_stream_id.0 += 1;
3364 *last_stream_id
3365 }
3366
3367 fn last_stream_id(&self) -> RefCell<StreamId> {
3368 self.inner().last_stream_id.clone()
3369 }
3370
3371 fn root_scope(&self) -> Scope {
3372 self.inner().root_scope
3373 }
3374
3375 fn node_id(&self) -> NodeId {
3376 self.inner().node_id
3377 }
3378
3379 fn global_node_id(&self) -> GlobalNodeId {
3380 self.inner().global_node_id.clone()
3381 }
3382
3383 fn check_fixedpoint(&self, scope: Scope) -> bool {
3384 self.inner().check_fixedpoint(scope)
3385 }
3386
3387 fn metadata_exchange(&self) -> &MetadataExchange {
3388 &self.inner().metadata_exchange
3389 }
3390
3391 fn balancer(&self) -> &Balancer {
3392 &self.inner().balancer
3393 }
3394
3395 fn set_balancer_hint(
3396 &self,
3397 global_node_id: &GlobalNodeId,
3398 hint: BalancerHint,
3399 ) -> Result<(), DbspError> {
3400 if global_node_id.parent_id() != Some(GlobalNodeId::root()) {
3401 return Err(DbspError::Balancer(BalancerError::NonTopLevelNode(
3402 global_node_id.clone(),
3403 )));
3404 }
3405
3406 self.inner()
3407 .balancer
3408 .set_hint(global_node_id.local_node_id().unwrap(), hint)
3409 }
3410
3411 fn get_current_balancer_policy(&self) -> BTreeMap<NodeId, PartitioningPolicy> {
3412 self.inner().balancer.get_policy()
3413 }
3414}
3415
3416impl<P, T> Circuit for ChildCircuit<P, T>
3417where
3418 P: Clone + 'static,
3419 T: Timestamp,
3420{
3421 type Parent = P;
3422
3423 fn parent(&self) -> P {
3424 self.inner().parent.clone()
3425 }
3426
3427 fn root_circuit(&self) -> RootCircuit {
3428 if <dyn Any>::is::<RootCircuit>(self) {
3429 unsafe { transmute::<&Self, &RootCircuit>(self) }.clone()
3430 } else {
3431 self.inner().root.as_ref().unwrap().clone()
3432 }
3433 }
3434
3435 fn map_node<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&dyn Node) -> V) -> V {
3436 let path = id.path();
3437 let mut result: Option<V> = None;
3438
3439 assert!(path.starts_with(self.global_id().path()));
3440
3441 self.map_node_relative(
3442 path.strip_prefix(self.global_id().path()).unwrap(),
3443 &mut |node| result = Some(f(node)),
3444 );
3445 result.unwrap()
3446 }
3447
3448 fn map_node_mut<V>(&self, id: &GlobalNodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3449 let path = id.path();
3450 let mut result: Option<V> = None;
3451
3452 assert!(path.starts_with(self.global_id().path()));
3453
3454 self.map_node_mut_relative(
3455 path.strip_prefix(self.global_id().path()).unwrap(),
3456 &mut |node| result = Some(f(node)),
3457 );
3458 result.unwrap()
3459 }
3460
3461 fn map_local_node_mut<V>(&self, id: NodeId, f: &mut dyn FnMut(&mut dyn Node) -> V) -> V {
3462 let mut result: Option<V> = None;
3463
3464 self.map_node_mut_relative(&[id], &mut |node| result = Some(f(node)));
3465 result.unwrap()
3466 }
3467
3468 fn ptr_eq(this: &Self, other: &Self) -> bool {
3469 Rc::ptr_eq(&this.inner, &other.inner)
3470 }
3471
3472 fn circuit_event_handlers(&self) -> CircuitEventHandlers {
3473 self.inner().circuit_event_handlers.clone()
3474 }
3475
3476 fn scheduler_event_handlers(&self) -> SchedulerEventHandlers {
3477 self.inner().scheduler_event_handlers.clone()
3478 }
3479
3480 fn log_circuit_event(&self, event: &CircuitEvent) {
3481 self.inner().log_circuit_event(event);
3482 }
3483
3484 fn log_scheduler_event(&self, event: &SchedulerEvent<'_>) {
3485 self.inner().log_scheduler_event(event);
3486 }
3487
3488 fn cache_get_or_insert_with<K, F>(&self, key: K, mut f: F) -> RefMut<'_, K::Value>
3489 where
3490 K: 'static + TypedMapKey<CircuitStoreMarker>,
3491 F: FnMut() -> K::Value,
3492 {
3493 if self.inner().store.borrow().contains_key(&key) {
3496 return RefMut::map(self.inner().store.borrow_mut(), |store| {
3497 store.get_mut(&key).unwrap()
3498 });
3499 }
3500
3501 let new = f();
3502
3503 RefMut::map(self.inner().store.borrow_mut(), |store| {
3506 store.entry(key).or_insert(new)
3507 })
3508 }
3509
3510 fn connect_stream<V: 'static>(
3511 &self,
3512 stream: &Stream<Self, V>,
3513 to: NodeId,
3514 ownership_preference: OwnershipPreference,
3515 ) {
3516 self.log_circuit_event(&CircuitEvent::stream(
3517 stream.origin_node_id().clone(),
3518 self.global_node_id().child(to),
3519 ownership_preference,
3520 ));
3521
3522 debug_assert_eq!(self.global_node_id(), stream.circuit.global_node_id());
3523 self.inner().add_edge(Edge {
3524 from: stream.local_node_id(),
3525 to,
3526 origin: stream.origin_node_id().clone(),
3527 stream: Some(Box::new(stream.clone())),
3528 ownership_preference: Some(ownership_preference),
3529 });
3530 }
3531
3532 fn tick(&self) {
3533 let mut time = self.time.borrow_mut();
3534 *time = time.advance(0);
3535 }
3536
3537 fn clock_start(&self, scope: Scope) {
3538 for node in self.inner().nodes.borrow_mut().iter_mut() {
3539 node.borrow_mut().clock_start(scope);
3540 }
3541 }
3542
3543 fn clock_end(&self, scope: Scope) {
3544 for node in self.inner().nodes.borrow_mut().iter_mut() {
3545 node.borrow_mut().clock_end(scope);
3546 }
3547
3548 let mut time = self.time.borrow_mut();
3549 *time = time.advance(scope + 1);
3550 }
3551
3552 fn ready(&self, id: NodeId) -> bool {
3553 self.inner().nodes.borrow()[id.0].borrow().ready()
3554 }
3555
3556 fn cache_insert<K>(&self, key: K, val: K::Value)
3557 where
3558 K: TypedMapKey<CircuitStoreMarker> + 'static,
3559 {
3560 self.inner().store.borrow_mut().insert(key, val);
3561 }
3562
3563 fn cache_contains<K>(&self, key: &K) -> bool
3564 where
3565 K: TypedMapKey<CircuitStoreMarker> + 'static,
3566 {
3567 self.inner().store.borrow().contains_key(key)
3568 }
3569
3570 fn cache_get<K>(&self, key: &K) -> Option<K::Value>
3571 where
3572 K: TypedMapKey<CircuitStoreMarker> + 'static,
3573 K::Value: Clone,
3574 {
3575 self.inner().store.borrow().get(key).cloned()
3576 }
3577
3578 fn register_ready_callback(&self, id: NodeId, cb: Box<dyn Fn() + Send + Sync>) {
3579 self.inner().nodes.borrow()[id.0]
3580 .borrow_mut()
3581 .register_ready_callback(cb);
3582 }
3583
3584 fn is_async_node(&self, id: NodeId) -> bool {
3585 self.inner().nodes.borrow()[id.0].borrow().is_async()
3586 }
3587
3588 #[allow(clippy::await_holding_refcell_ref)]
3590 async fn eval_node(&self, id: NodeId) -> Result<Option<Position>, SchedulerError> {
3591 let circuit = self.inner();
3592 debug_assert!(id.0 < circuit.nodes.borrow().len());
3593
3594 circuit.log_scheduler_event(&SchedulerEvent::eval_start(
3599 circuit.nodes.borrow()[id.0].borrow().as_ref(),
3600 ));
3601
3602 let progress = circuit.nodes.borrow()[id.0].borrow_mut().eval().await?;
3603
3604 circuit.log_scheduler_event(&SchedulerEvent::eval_end(
3605 circuit.nodes.borrow()[id.0].borrow().as_ref(),
3606 ));
3607
3608 Ok(progress)
3609 }
3610
3611 fn eval_import_node(&self, id: NodeId) {
3613 let circuit = self.inner();
3614 debug_assert!(id.0 < circuit.nodes.borrow().len());
3615 debug_assert!(circuit.import_nodes().contains(&id));
3616
3617 circuit.nodes.borrow()[id.0].borrow_mut().import();
3618 }
3619
3620 fn flush_node(&self, id: NodeId) {
3621 let circuit = self.inner();
3622 debug_assert!(id.0 < circuit.nodes.borrow().len());
3623
3624 circuit.nodes.borrow()[id.0].borrow_mut().flush();
3625 }
3626
3627 fn is_flush_complete(&self, id: NodeId) -> bool {
3628 let circuit = self.inner();
3629 debug_assert!(id.0 < circuit.nodes.borrow().len());
3630
3631 circuit.nodes.borrow()[id.0].borrow().is_flush_complete()
3632 }
3633
3634 #[track_caller]
3635 fn region<F, V>(&self, name: &str, f: F) -> V
3636 where
3637 F: FnOnce() -> V,
3638 {
3639 self.log_circuit_event(&CircuitEvent::push_region(name, Some(Location::caller())));
3640 let res = f();
3641 self.log_circuit_event(&CircuitEvent::pop_region());
3642 res
3643 }
3644
3645 fn add_preprocessor(&self, preprocessor_node_id: NodeId) {
3646 for node in self.inner().nodes.borrow_mut().iter() {
3647 if node.borrow().is_input() {
3648 self.add_dependency(preprocessor_node_id, node.borrow().local_id());
3649 }
3650 }
3651 }
3652
3653 fn add_source<O, Op>(&self, operator: Op) -> Stream<Self, O>
3655 where
3656 O: Data,
3657 Op: SourceOperator<O>,
3658 {
3659 self.add_node(|id| {
3660 self.log_circuit_event(&CircuitEvent::operator(
3661 GlobalNodeId::child_of(self, id),
3662 operator.name(),
3663 operator.location(),
3664 ));
3665
3666 let node = SourceNode::new(operator, self.clone(), id);
3667 let output_stream = node.output_stream();
3668 (node, output_stream)
3669 })
3670 }
3671
3672 fn add_exchange<I, SndOp, O, RcvOp>(
3673 &self,
3674 sender: SndOp,
3675 receiver: RcvOp,
3676 input_stream: &Stream<Self, I>,
3677 ) -> Stream<Self, O>
3678 where
3679 I: Data,
3680 O: Data,
3681 SndOp: SinkOperator<I>,
3682 RcvOp: SourceOperator<O>,
3683 {
3684 let preference = sender.input_preference();
3685 self.add_exchange_with_preference(sender, receiver, input_stream, preference)
3686 }
3687
3688 fn add_exchange_with_preference<I, SndOp, O, RcvOp>(
3689 &self,
3690 sender: SndOp,
3691 receiver: RcvOp,
3692 input_stream: &Stream<Self, I>,
3693 input_preference: OwnershipPreference,
3694 ) -> Stream<Self, O>
3695 where
3696 I: Data,
3697 O: Data,
3698 SndOp: SinkOperator<I>,
3699 RcvOp: SourceOperator<O>,
3700 {
3701 let sender_id = self.add_node(|id| {
3702 self.log_circuit_event(&CircuitEvent::operator(
3703 GlobalNodeId::child_of(self, id),
3704 sender.name(),
3705 sender.location(),
3706 ));
3707
3708 let node = SinkNode::new(sender, input_stream.clone(), self.clone(), id);
3709 self.connect_stream(input_stream, id, input_preference);
3710 (node, id)
3711 });
3712
3713 let output_stream = self.add_node(|id| {
3714 self.log_circuit_event(&CircuitEvent::operator(
3715 GlobalNodeId::child_of(self, id),
3716 receiver.name(),
3717 receiver.location(),
3718 ));
3719
3720 let node = SourceNode::new(receiver, self.clone(), id);
3721 let output_stream = node.output_stream();
3722 (node, output_stream)
3723 });
3724
3725 self.add_dependency(sender_id, output_stream.local_node_id());
3726 output_stream
3727 }
3728
3729 fn add_sink<I, Op>(&self, operator: Op, input_stream: &Stream<Self, I>) -> GlobalNodeId
3730 where
3731 I: Data,
3732 Op: SinkOperator<I>,
3733 {
3734 let preference = operator.input_preference();
3735 self.add_sink_with_preference(operator, input_stream, preference)
3736 }
3737
3738 fn add_sink_with_preference<I, Op>(
3739 &self,
3740 operator: Op,
3741 input_stream: &Stream<Self, I>,
3742 input_preference: OwnershipPreference,
3743 ) -> GlobalNodeId
3744 where
3745 I: Data,
3746 Op: SinkOperator<I>,
3747 {
3748 self.add_node(|id| {
3749 let global_node_id = GlobalNodeId::child_of(self, id);
3750 self.log_circuit_event(&CircuitEvent::operator(
3753 global_node_id.clone(),
3754 operator.name(),
3755 operator.location(),
3756 ));
3757
3758 self.connect_stream(input_stream, id, input_preference);
3759 (
3760 SinkNode::new(operator, input_stream.clone(), self.clone(), id),
3761 global_node_id,
3762 )
3763 })
3764 }
3765
3766 fn add_binary_sink<I1, I2, Op>(
3768 &self,
3769 operator: Op,
3770 input_stream1: &Stream<Self, I1>,
3771 input_stream2: &Stream<Self, I2>,
3772 ) where
3773 I1: Data,
3774 I2: Data,
3775 Op: BinarySinkOperator<I1, I2>,
3776 {
3777 let (preference1, preference2) = operator.input_preference();
3778 self.add_binary_sink_with_preference(
3779 operator,
3780 (input_stream1, preference1),
3781 (input_stream2, preference2),
3782 )
3783 }
3784
3785 fn add_binary_sink_with_preference<I1, I2, Op>(
3786 &self,
3787 operator: Op,
3788 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3789 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3790 ) where
3791 I1: Data,
3792 I2: Data,
3793 Op: BinarySinkOperator<I1, I2>,
3794 {
3795 let (input_stream1, input_preference1) = input_stream1;
3796 let (input_stream2, input_preference2) = input_stream2;
3797
3798 self.add_node(|id| {
3799 self.log_circuit_event(&CircuitEvent::operator(
3800 GlobalNodeId::child_of(self, id),
3801 operator.name(),
3802 operator.location(),
3803 ));
3804
3805 let node = BinarySinkNode::new(
3806 operator,
3807 input_stream1.clone(),
3808 input_stream2.clone(),
3809 self.clone(),
3810 id,
3811 );
3812 self.connect_stream(input_stream1, id, input_preference1);
3813 self.connect_stream(input_stream2, id, input_preference2);
3814 (node, ())
3815 });
3816 }
3817
3818 fn add_ternary_sink<I1, I2, I3, Op>(
3820 &self,
3821 operator: Op,
3822 input_stream1: &Stream<Self, I1>,
3823 input_stream2: &Stream<Self, I2>,
3824 input_stream3: &Stream<Self, I3>,
3825 ) -> GlobalNodeId
3826 where
3827 I1: Data,
3828 I2: Data,
3829 I3: Data,
3830 Op: TernarySinkOperator<I1, I2, I3>,
3831 {
3832 let (preference1, preference2, preference3) = operator.input_preference();
3833 self.add_ternary_sink_with_preference(
3834 operator,
3835 (input_stream1, preference1),
3836 (input_stream2, preference2),
3837 (input_stream3, preference3),
3838 )
3839 }
3840
3841 fn add_ternary_sink_with_preference<I1, I2, I3, Op>(
3842 &self,
3843 operator: Op,
3844 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3845 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3846 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
3847 ) -> GlobalNodeId
3848 where
3849 I1: Data,
3850 I2: Data,
3851 I3: Data,
3852 Op: TernarySinkOperator<I1, I2, I3>,
3853 {
3854 let (input_stream1, input_preference1) = input_stream1;
3855 let (input_stream2, input_preference2) = input_stream2;
3856 let (input_stream3, input_preference3) = input_stream3;
3857
3858 self.add_node(|id| {
3859 let global_node_id = GlobalNodeId::child_of(self, id);
3860
3861 self.log_circuit_event(&CircuitEvent::operator(
3862 GlobalNodeId::child_of(self, id),
3863 operator.name(),
3864 operator.location(),
3865 ));
3866
3867 let node = TernarySinkNode::new(
3868 operator,
3869 input_stream1.clone(),
3870 input_stream2.clone(),
3871 input_stream3.clone(),
3872 self.clone(),
3873 id,
3874 );
3875 self.connect_stream(input_stream1, id, input_preference1);
3876 self.connect_stream(input_stream2, id, input_preference2);
3877 self.connect_stream(input_stream3, id, input_preference3);
3878 (node, global_node_id)
3879 })
3880 }
3881
3882 fn add_unary_operator<I, O, Op>(
3883 &self,
3884 operator: Op,
3885 input_stream: &Stream<Self, I>,
3886 ) -> Stream<Self, O>
3887 where
3888 I: Data,
3889 O: Data,
3890 Op: UnaryOperator<I, O>,
3891 {
3892 let preference = operator.input_preference();
3893 self.add_unary_operator_with_preference(operator, input_stream, preference)
3894 }
3895
3896 fn add_unary_operator_with_preference<I, O, Op>(
3897 &self,
3898 operator: Op,
3899 input_stream: &Stream<Self, I>,
3900 input_preference: OwnershipPreference,
3901 ) -> Stream<Self, O>
3902 where
3903 I: Data,
3904 O: Data,
3905 Op: UnaryOperator<I, O>,
3906 {
3907 self.add_node(|id| {
3908 self.log_circuit_event(&CircuitEvent::operator(
3909 GlobalNodeId::child_of(self, id),
3910 operator.name(),
3911 operator.location(),
3912 ));
3913
3914 let node = UnaryNode::new(operator, input_stream.clone(), self.clone(), id);
3915 let output_stream = node.output_stream();
3916 self.connect_stream(input_stream, id, input_preference);
3917 (node, output_stream)
3918 })
3919 }
3920
3921 fn add_binary_operator<I1, I2, O, Op>(
3922 &self,
3923 operator: Op,
3924 input_stream1: &Stream<Self, I1>,
3925 input_stream2: &Stream<Self, I2>,
3926 ) -> Stream<Self, O>
3927 where
3928 I1: Data,
3929 I2: Data,
3930 O: Data,
3931 Op: BinaryOperator<I1, I2, O>,
3932 {
3933 let (pref1, pref2) = operator.input_preference();
3934 self.add_binary_operator_with_preference(
3935 operator,
3936 (input_stream1, pref1),
3937 (input_stream2, pref2),
3938 )
3939 }
3940
3941 fn add_binary_operator_with_preference<I1, I2, O, Op>(
3942 &self,
3943 operator: Op,
3944 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
3945 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
3946 ) -> Stream<Self, O>
3947 where
3948 I1: Data,
3949 I2: Data,
3950 O: Data,
3951 Op: BinaryOperator<I1, I2, O>,
3952 {
3953 let (input_stream1, input_preference1) = input_stream1;
3954 let (input_stream2, input_preference2) = input_stream2;
3955
3956 self.add_node(|id| {
3957 self.log_circuit_event(&CircuitEvent::operator(
3958 GlobalNodeId::child_of(self, id),
3959 operator.name(),
3960 operator.location(),
3961 ));
3962
3963 let node = BinaryNode::new(
3964 operator,
3965 input_stream1.clone(),
3966 input_stream2.clone(),
3967 self.clone(),
3968 id,
3969 );
3970 let output_stream = node.output_stream();
3971 self.connect_stream(input_stream1, id, input_preference1);
3972 self.connect_stream(input_stream2, id, input_preference2);
3973 (node, output_stream)
3974 })
3975 }
3976
3977 fn add_ternary_operator<I1, I2, I3, O, Op>(
3978 &self,
3979 operator: Op,
3980 input_stream1: &Stream<Self, I1>,
3981 input_stream2: &Stream<Self, I2>,
3982 input_stream3: &Stream<Self, I3>,
3983 ) -> Stream<Self, O>
3984 where
3985 I1: Data,
3986 I2: Data,
3987 I3: Data,
3988 O: Data,
3989 Op: TernaryOperator<I1, I2, I3, O>,
3990 {
3991 let (pref1, pref2, pref3) = operator.input_preference();
3992 self.add_ternary_operator_with_preference(
3993 operator,
3994 (input_stream1, pref1),
3995 (input_stream2, pref2),
3996 (input_stream3, pref3),
3997 )
3998 }
3999
4000 #[allow(clippy::too_many_arguments)]
4001 fn add_ternary_operator_with_preference<I1, I2, I3, O, Op>(
4002 &self,
4003 operator: Op,
4004 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4005 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4006 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4007 ) -> Stream<Self, O>
4008 where
4009 I1: Data,
4010 I2: Data,
4011 I3: Data,
4012 O: Data,
4013 Op: TernaryOperator<I1, I2, I3, O>,
4014 {
4015 let (input_stream1, input_preference1) = input_stream1;
4016 let (input_stream2, input_preference2) = input_stream2;
4017 let (input_stream3, input_preference3) = input_stream3;
4018
4019 self.add_node(|id| {
4020 self.log_circuit_event(&CircuitEvent::operator(
4021 GlobalNodeId::child_of(self, id),
4022 operator.name(),
4023 operator.location(),
4024 ));
4025
4026 let node = TernaryNode::new(
4027 operator,
4028 input_stream1.clone(),
4029 input_stream2.clone(),
4030 input_stream3.clone(),
4031 self.clone(),
4032 id,
4033 );
4034 let output_stream = node.output_stream();
4035 self.connect_stream(input_stream1, id, input_preference1);
4036 self.connect_stream(input_stream2, id, input_preference2);
4037 self.connect_stream(input_stream3, id, input_preference3);
4038 (node, output_stream)
4039 })
4040 }
4041
4042 fn add_quaternary_operator<I1, I2, I3, I4, O, Op>(
4043 &self,
4044 operator: Op,
4045 input_stream1: &Stream<Self, I1>,
4046 input_stream2: &Stream<Self, I2>,
4047 input_stream3: &Stream<Self, I3>,
4048 input_stream4: &Stream<Self, I4>,
4049 ) -> Stream<Self, O>
4050 where
4051 I1: Data,
4052 I2: Data,
4053 I3: Data,
4054 I4: Data,
4055 O: Data,
4056 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4057 {
4058 let (pref1, pref2, pref3, pref4) = operator.input_preference();
4059 self.add_quaternary_operator_with_preference(
4060 operator,
4061 (input_stream1, pref1),
4062 (input_stream2, pref2),
4063 (input_stream3, pref3),
4064 (input_stream4, pref4),
4065 )
4066 }
4067
4068 #[allow(clippy::too_many_arguments)]
4069 fn add_quaternary_operator_with_preference<I1, I2, I3, I4, O, Op>(
4070 &self,
4071 operator: Op,
4072 input_stream1: (&Stream<Self, I1>, OwnershipPreference),
4073 input_stream2: (&Stream<Self, I2>, OwnershipPreference),
4074 input_stream3: (&Stream<Self, I3>, OwnershipPreference),
4075 input_stream4: (&Stream<Self, I4>, OwnershipPreference),
4076 ) -> Stream<Self, O>
4077 where
4078 I1: Data,
4079 I2: Data,
4080 I3: Data,
4081 I4: Data,
4082 O: Data,
4083 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
4084 {
4085 let (input_stream1, input_preference1) = input_stream1;
4086 let (input_stream2, input_preference2) = input_stream2;
4087 let (input_stream3, input_preference3) = input_stream3;
4088 let (input_stream4, input_preference4) = input_stream4;
4089
4090 self.add_node(|id| {
4091 self.log_circuit_event(&CircuitEvent::operator(
4092 GlobalNodeId::child_of(self, id),
4093 operator.name(),
4094 operator.location(),
4095 ));
4096
4097 let node = QuaternaryNode::new(
4098 operator,
4099 input_stream1.clone(),
4100 input_stream2.clone(),
4101 input_stream3.clone(),
4102 input_stream4.clone(),
4103 self.clone(),
4104 id,
4105 );
4106 let output_stream = node.output_stream();
4107 self.connect_stream(input_stream1, id, input_preference1);
4108 self.connect_stream(input_stream2, id, input_preference2);
4109 self.connect_stream(input_stream3, id, input_preference3);
4110 self.connect_stream(input_stream4, id, input_preference4);
4111 (node, output_stream)
4112 })
4113 }
4114
4115 fn add_nary_operator<'a, I, O, Op, Iter>(
4116 &'a self,
4117 operator: Op,
4118 input_streams: Iter,
4119 ) -> Stream<Self, O>
4120 where
4121 I: Data,
4122 O: Data,
4123 Op: NaryOperator<I, O>,
4124 Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4125 {
4126 let pref = operator.input_preference();
4127 self.add_nary_operator_with_preference(operator, input_streams, pref)
4128 }
4129
4130 fn add_nary_operator_with_preference<'a, I, O, Op, Iter>(
4131 &'a self,
4132 operator: Op,
4133 input_streams: Iter,
4134 input_preference: OwnershipPreference,
4135 ) -> Stream<Self, O>
4136 where
4137 I: Data,
4138 O: Data,
4139 Op: NaryOperator<I, O>,
4140 Iter: IntoIterator<Item = &'a Stream<Self, I>>,
4141 {
4142 let input_streams: Vec<Stream<_, _>> = input_streams.into_iter().cloned().collect();
4143 self.add_node(|id| {
4144 self.log_circuit_event(&CircuitEvent::operator(
4145 GlobalNodeId::child_of(self, id),
4146 operator.name(),
4147 operator.location(),
4148 ));
4149
4150 let node = NaryNode::new(operator, input_streams.clone(), self.clone(), id);
4151 let output_stream = node.output_stream();
4152 for stream in input_streams.iter() {
4153 self.connect_stream(stream, id, input_preference);
4154 }
4155 (node, output_stream)
4156 })
4157 }
4158
4159 fn add_feedback<I, O, Op>(
4160 &self,
4161 operator: Op,
4162 ) -> (Stream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4163 where
4164 I: Data,
4165 O: Data,
4166 Op: StrictUnaryOperator<I, O>,
4167 {
4168 self.add_node(|id| {
4169 self.log_circuit_event(&CircuitEvent::strict_operator_output(
4170 GlobalNodeId::child_of(self, id),
4171 operator.name(),
4172 operator.location(),
4173 ));
4174
4175 let operator = Rc::new(RefCell::new(operator));
4176 let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4177 let output_node = FeedbackOutputNode::new(operator, self.clone(), id);
4178 let local = output_node.output_stream();
4179 (output_node, (local, connector))
4180 })
4181 }
4182
4183 fn add_feedback_with_export<I, O, Op>(
4184 &self,
4185 operator: Op,
4186 ) -> (ExportStream<Self, O>, FeedbackConnector<Self, I, O, Op>)
4187 where
4188 I: Data,
4189 O: Data,
4190 Op: StrictUnaryOperator<I, O>,
4191 {
4192 self.add_node(|id| {
4193 self.log_circuit_event(&CircuitEvent::strict_operator_output(
4194 GlobalNodeId::child_of(self, id),
4195 operator.name(),
4196 operator.location(),
4197 ));
4198
4199 let operator = Rc::new(RefCell::new(operator));
4200 let connector = FeedbackConnector::new(id, self.clone(), operator.clone());
4201 let output_node = FeedbackOutputNode::with_export(operator, self.clone(), id);
4202 let local = output_node.output_stream();
4203 let export = output_node.export_stream.clone().unwrap();
4204 (output_node, (ExportStream { local, export }, connector))
4205 })
4206 }
4207
4208 fn connect_feedback_with_preference<I, O, Op>(
4212 &self,
4213 output_node_id: NodeId,
4214 operator: Rc<RefCell<Op>>,
4215 input_stream: &Stream<Self, I>,
4216 input_preference: OwnershipPreference,
4217 ) where
4218 I: Data,
4219 O: Data,
4220 Op: StrictUnaryOperator<I, O>,
4221 {
4222 self.add_node(|id| {
4223 self.log_circuit_event(&CircuitEvent::strict_operator_input(
4224 GlobalNodeId::child_of(self, id),
4225 output_node_id,
4226 ));
4227
4228 let output_node = FeedbackInputNode::new(operator, input_stream.clone(), id);
4229 self.connect_stream(input_stream, id, input_preference);
4230 self.add_dependency(output_node_id, id);
4231 (output_node, ())
4232 })
4233 }
4234
4235 fn iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4236 where
4237 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4238 E: Executor<IterativeCircuit<Self>>,
4239 {
4240 self.try_add_node(|id| {
4241 let global_id = GlobalNodeId::child_of(self, id);
4242 self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), true));
4243 let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4244 let (res, executor) = child_constructor(&mut child_circuit)?;
4245 let child = <ChildNode<IterativeCircuit<Self>>>::new::<E>(child_circuit, 1, executor);
4246 self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4247 Ok((child, res))
4248 })
4249 }
4250
4251 fn non_iterative_subcircuit<F, V, E>(&self, child_constructor: F) -> Result<V, SchedulerError>
4252 where
4253 F: FnOnce(&mut NonIterativeCircuit<Self>) -> Result<(V, E), SchedulerError>,
4254 E: Executor<NonIterativeCircuit<Self>>,
4255 {
4256 self.try_add_node(|id| {
4257 let global_id = GlobalNodeId::child_of(self, id);
4258 self.log_circuit_event(&CircuitEvent::subcircuit(global_id.clone(), false));
4259 let mut child_circuit = ChildCircuit::with_parent(self.clone(), id);
4260 let (res, executor) = child_constructor(&mut child_circuit)?;
4261 let child =
4262 <ChildNode<NonIterativeCircuit<Self>>>::new::<E>(child_circuit, 0, executor);
4263 self.log_circuit_event(&CircuitEvent::subcircuit_complete(global_id));
4264 Ok((child, res))
4265 })
4266 }
4267
4268 fn iterate<F, C, V>(&self, constructor: F) -> Result<V, SchedulerError>
4269 where
4270 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4271 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4272 {
4273 self.iterate_with_scheduler::<F, C, V, DynamicScheduler>(constructor)
4274 }
4275
4276 fn iterate_with_scheduler<F, C, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4281 where
4282 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<(C, V), SchedulerError>,
4283 C: AsyncFn() -> Result<bool, SchedulerError> + 'static,
4284 S: Scheduler + 'static,
4285 {
4286 self.iterative_subcircuit(|child| {
4287 let (termination_check, res) = constructor(child)?;
4288 let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4289 executor.prepare(child, None)?;
4290 Ok((res, executor))
4291 })
4292 }
4293
4294 fn fixedpoint<F, V>(&self, constructor: F) -> Result<V, SchedulerError>
4295 where
4296 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4297 {
4298 self.fixedpoint_with_scheduler::<F, V, DynamicScheduler>(constructor)
4299 }
4300
4301 fn fixedpoint_with_scheduler<F, V, S>(&self, constructor: F) -> Result<V, SchedulerError>
4302 where
4303 F: FnOnce(&mut IterativeCircuit<Self>) -> Result<V, SchedulerError>,
4304 S: Scheduler + 'static,
4305 {
4306 self.iterative_subcircuit(|child| {
4307 let res = constructor(child)?;
4308 let child_clone = child.clone();
4309
4310 let consensus = Consensus::new();
4311
4312 let termination_check = async move || {
4313 let local_fixedpoint = child_clone.inner().check_fixedpoint(0);
4315 consensus.check(local_fixedpoint).await
4316 };
4317 let mut executor = <IterativeExecutor<_, S>>::new(termination_check);
4318 executor.prepare(child, None)?;
4319 Ok((res, executor))
4320 })
4321 }
4322
4323 fn import_stream<I, O, Op>(&self, operator: Op, parent_stream: &Stream<P, I>) -> Stream<Self, O>
4324 where
4325 Self::Parent: Circuit,
4326 I: Data,
4327 O: Data,
4328 Op: ImportOperator<I, O>,
4329 {
4330 let preference = operator.input_preference();
4331 self.import_stream_with_preference(operator, parent_stream, preference)
4332 }
4333
4334 fn import_stream_with_preference<I, O, Op>(
4335 &self,
4336 operator: Op,
4337 parent_stream: &Stream<P, I>,
4338 input_preference: OwnershipPreference,
4339 ) -> Stream<Self, O>
4340 where
4341 Self::Parent: Circuit,
4342 I: Data,
4343 O: Data,
4344 Op: ImportOperator<I, O>,
4345 {
4346 assert!(self.is_child_of(parent_stream.circuit()));
4347
4348 let output_stream = self.add_node(|id| {
4349 let node_id = self.global_node_id().child(id);
4350 self.log_circuit_event(&CircuitEvent::operator(
4351 node_id.clone(),
4352 operator.name(),
4353 operator.location(),
4354 ));
4355 let node = ImportNode::new(operator, self.clone(), parent_stream.clone(), id);
4356 self.parent()
4358 .connect_stream(parent_stream, self.node_id(), input_preference);
4359 self.parent().log_circuit_event(&CircuitEvent::stream(
4361 parent_stream.origin_node_id().clone(),
4362 node_id.clone(),
4363 input_preference,
4364 ));
4365 let output_stream = node.output_stream();
4366 (node, output_stream)
4367 });
4368
4369 self.add_import_node(output_stream.local_node_id());
4370
4371 output_stream
4372 }
4373
4374 fn add_replay_edges(&self, stream_id: StreamId, replay_stream: &dyn StreamMetadata) {
4375 let mut edges = self.edges_mut();
4376 let mut new_edges = Vec::new();
4377
4378 let Some(edges_to_replay) = edges.get_by_stream_id(&Some(stream_id)) else {
4379 return;
4380 };
4381
4382 for edge in edges_to_replay {
4383 new_edges.push(Edge {
4390 from: replay_stream.local_node_id(),
4391 to: edge.to,
4392 origin: replay_stream.origin_node_id().clone(),
4393 stream: Some(clone_box(replay_stream)),
4394 ownership_preference: edge.ownership_preference,
4395 });
4396 }
4397
4398 edges.extend(new_edges);
4399 }
4400}
4401struct ImportNode<C, I, O, Op>
4402where
4403 C: Circuit,
4404{
4405 id: GlobalNodeId,
4406 operator: Op,
4407 parent_stream: Stream<C::Parent, I>,
4408 output_stream: Stream<C, O>,
4409 labels: BTreeMap<String, String>,
4410}
4411
4412impl<C, I, O, Op> ImportNode<C, I, O, Op>
4413where
4414 C: Circuit,
4415 C::Parent: Circuit,
4416 I: Clone + 'static,
4417 O: Clone + 'static,
4418 Op: ImportOperator<I, O>,
4419{
4420 fn new(operator: Op, circuit: C, parent_stream: Stream<C::Parent, I>, id: NodeId) -> Self {
4421 assert!(Circuit::ptr_eq(&circuit.parent(), parent_stream.circuit()));
4422
4423 Self {
4424 id: circuit.global_node_id().child(id),
4425 operator,
4426 parent_stream,
4427 output_stream: Stream::new(circuit, id),
4428 labels: BTreeMap::new(),
4429 }
4430 }
4431
4432 fn output_stream(&self) -> Stream<C, O> {
4433 self.output_stream.clone()
4434 }
4435}
4436
4437impl<C, I, O, Op> Node for ImportNode<C, I, O, Op>
4438where
4439 C: Circuit,
4440 C::Parent: Circuit,
4441 I: Clone + 'static,
4442 O: Clone + 'static,
4443 Op: ImportOperator<I, O>,
4444{
4445 fn name(&self) -> Cow<'static, str> {
4446 self.operator.name()
4447 }
4448
4449 fn local_id(&self) -> NodeId {
4450 self.id.local_node_id().unwrap()
4451 }
4452
4453 fn global_id(&self) -> &GlobalNodeId {
4454 &self.id
4455 }
4456
4457 fn is_async(&self) -> bool {
4458 self.operator.is_async()
4459 }
4460
4461 fn is_input(&self) -> bool {
4462 self.operator.is_input()
4463 }
4464
4465 fn ready(&self) -> bool {
4466 self.operator.ready()
4467 }
4468
4469 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4470 self.operator.register_ready_callback(cb);
4471 }
4472
4473 fn eval<'a>(
4474 &'a mut self,
4475 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4476 Box::pin(async {
4477 self.output_stream.put(self.operator.eval().await);
4478 Ok(self.operator.flush_progress())
4479 })
4480 }
4481
4482 fn import(&mut self) {
4483 match StreamValue::take(self.parent_stream.val()) {
4484 None => self
4485 .operator
4486 .import(StreamValue::peek(&self.parent_stream.get())),
4487 Some(val) => self.operator.import_owned(val),
4488 }
4489
4490 StreamValue::consume_token(self.parent_stream.val());
4491 }
4492
4493 fn start_transaction(&mut self) {
4494 self.operator.start_transaction();
4495 }
4496
4497 fn flush(&mut self) {
4498 self.operator.flush();
4499 }
4500
4501 fn is_flush_complete(&self) -> bool {
4502 self.operator.is_flush_complete()
4503 }
4504
4505 fn clock_start(&mut self, scope: Scope) {
4506 self.operator.clock_start(scope);
4507 }
4508
4509 fn clock_end(&mut self, scope: Scope) {
4510 self.operator.clock_end(scope);
4511 }
4512
4513 fn init(&mut self) {
4514 self.operator.init(&self.id);
4515 }
4516
4517 fn metadata(&self, output: &mut OperatorMeta) {
4518 self.operator.metadata(output);
4519 }
4520
4521 fn fixedpoint(&self, scope: Scope) -> bool {
4522 self.operator.fixedpoint(scope)
4523 }
4524
4525 fn checkpoint(
4526 &mut self,
4527 base: &StoragePath,
4528 files: &mut Vec<Arc<dyn FileCommitter>>,
4529 ) -> Result<(), DbspError> {
4530 self.operator
4531 .checkpoint(base, self.persistent_id().as_deref(), files)
4532 }
4533
4534 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4535 self.operator.restore(base, self.persistent_id().as_deref())
4536 }
4537
4538 fn clear_state(&mut self) -> Result<(), DbspError> {
4539 self.operator.clear_state()
4540 }
4541
4542 fn start_replay(&mut self) -> Result<(), DbspError> {
4543 self.operator.start_replay()
4544 }
4545
4546 fn end_replay(&mut self) -> Result<(), DbspError> {
4547 self.operator.end_replay()
4548 }
4549
4550 fn is_replay_complete(&self) -> bool {
4551 self.operator.is_replay_complete()
4552 }
4553
4554 fn set_label(&mut self, key: &str, value: &str) {
4555 self.labels.insert(key.to_string(), value.to_string());
4556 }
4557
4558 fn get_label(&self, key: &str) -> Option<&str> {
4559 self.labels.get(key).map(|s| s.as_str())
4560 }
4561
4562 fn labels(&self) -> &BTreeMap<String, String> {
4563 &self.labels
4564 }
4565
4566 fn as_any(&self) -> &dyn Any {
4567 self
4568 }
4569}
4570
4571struct SourceNode<C, O, Op> {
4572 id: GlobalNodeId,
4573 operator: Op,
4574 output_stream: Stream<C, O>,
4575 labels: BTreeMap<String, String>,
4576}
4577
4578impl<C, O, Op> SourceNode<C, O, Op>
4579where
4580 Op: SourceOperator<O>,
4581 C: Circuit,
4582{
4583 fn new(operator: Op, circuit: C, id: NodeId) -> Self {
4584 Self {
4585 id: circuit.global_node_id().child(id),
4586 operator,
4587 output_stream: Stream::new(circuit, id),
4588 labels: BTreeMap::new(),
4589 }
4590 }
4591
4592 fn output_stream(&self) -> Stream<C, O> {
4593 self.output_stream.clone()
4594 }
4595}
4596
4597impl<C, O, Op> Node for SourceNode<C, O, Op>
4598where
4599 C: Circuit,
4600 O: Clone + 'static,
4601 Op: SourceOperator<O>,
4602{
4603 fn name(&self) -> Cow<'static, str> {
4604 self.operator.name()
4605 }
4606
4607 fn local_id(&self) -> NodeId {
4608 self.id.local_node_id().unwrap()
4609 }
4610
4611 fn global_id(&self) -> &GlobalNodeId {
4612 &self.id
4613 }
4614
4615 fn is_async(&self) -> bool {
4616 self.operator.is_async()
4617 }
4618
4619 fn is_input(&self) -> bool {
4620 self.operator.is_input()
4621 }
4622
4623 fn ready(&self) -> bool {
4624 self.operator.ready()
4625 }
4626
4627 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4628 self.operator.register_ready_callback(cb);
4629 }
4630
4631 fn eval<'a>(
4632 &'a mut self,
4633 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4634 Box::pin(async {
4635 self.output_stream.put(self.operator.eval().await);
4636 Ok(self.operator.flush_progress())
4637 })
4638 }
4639
4640 fn start_transaction(&mut self) {
4641 self.operator.start_transaction();
4642 }
4643
4644 fn flush(&mut self) {
4645 self.operator.flush();
4646 }
4647
4648 fn is_flush_complete(&self) -> bool {
4649 self.operator.is_flush_complete()
4650 }
4651
4652 fn clock_start(&mut self, scope: Scope) {
4653 self.operator.clock_start(scope);
4654 }
4655
4656 fn clock_end(&mut self, scope: Scope) {
4657 self.operator.clock_end(scope);
4658 }
4659
4660 fn init(&mut self) {
4661 self.operator.init(&self.id);
4662 }
4663
4664 fn metadata(&self, output: &mut OperatorMeta) {
4665 self.operator.metadata(output);
4666 }
4667
4668 fn fixedpoint(&self, scope: Scope) -> bool {
4669 self.operator.fixedpoint(scope)
4670 }
4671
4672 fn checkpoint(
4673 &mut self,
4674 base: &StoragePath,
4675 files: &mut Vec<Arc<dyn FileCommitter>>,
4676 ) -> Result<(), DbspError> {
4677 self.operator
4678 .checkpoint(base, self.persistent_id().as_deref(), files)
4679 }
4680
4681 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4682 self.operator.restore(base, self.persistent_id().as_deref())
4683 }
4684
4685 fn clear_state(&mut self) -> Result<(), DbspError> {
4686 self.operator.clear_state()
4687 }
4688
4689 fn start_replay(&mut self) -> Result<(), DbspError> {
4690 self.operator.start_replay()
4691 }
4692
4693 fn is_replay_complete(&self) -> bool {
4694 self.operator.is_replay_complete()
4695 }
4696
4697 fn end_replay(&mut self) -> Result<(), DbspError> {
4698 self.operator.end_replay()
4699 }
4700
4701 fn set_label(&mut self, key: &str, value: &str) {
4702 self.labels.insert(key.to_string(), value.to_string());
4703 }
4704
4705 fn get_label(&self, key: &str) -> Option<&str> {
4706 self.labels.get(key).map(|s| s.as_str())
4707 }
4708
4709 fn labels(&self) -> &BTreeMap<String, String> {
4710 &self.labels
4711 }
4712
4713 fn as_any(&self) -> &dyn Any {
4714 self
4715 }
4716}
4717
4718struct UnaryNode<C, I, O, Op> {
4719 id: GlobalNodeId,
4720 operator: Op,
4721 input_stream: Stream<C, I>,
4722 output_stream: Stream<C, O>,
4723 labels: BTreeMap<String, String>,
4724}
4725
4726impl<C, I, O, Op> UnaryNode<C, I, O, Op>
4727where
4728 Op: UnaryOperator<I, O>,
4729 C: Circuit,
4730{
4731 fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4732 Self {
4733 id: circuit.global_node_id().child(id),
4734 operator,
4735 input_stream,
4736 output_stream: Stream::new(circuit, id),
4737 labels: BTreeMap::new(),
4738 }
4739 }
4740
4741 fn output_stream(&self) -> Stream<C, O> {
4742 self.output_stream.clone()
4743 }
4744}
4745
4746impl<C, I, O, Op> Node for UnaryNode<C, I, O, Op>
4747where
4748 C: Circuit,
4749 I: Clone + 'static,
4750 O: Clone + 'static,
4751 Op: UnaryOperator<I, O>,
4752{
4753 fn name(&self) -> Cow<'static, str> {
4754 self.operator.name()
4755 }
4756
4757 fn local_id(&self) -> NodeId {
4758 self.id.local_node_id().unwrap()
4759 }
4760
4761 fn global_id(&self) -> &GlobalNodeId {
4762 &self.id
4763 }
4764
4765 fn is_async(&self) -> bool {
4766 self.operator.is_async()
4767 }
4768
4769 fn is_input(&self) -> bool {
4770 self.operator.is_input()
4771 }
4772
4773 fn ready(&self) -> bool {
4774 self.operator.ready()
4775 }
4776
4777 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4778 self.operator.register_ready_callback(cb);
4779 }
4780
4781 #[allow(clippy::await_holding_refcell_ref)]
4783 fn eval<'a>(
4784 &'a mut self,
4785 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4786 Box::pin(async {
4787 self.output_stream
4788 .put(match StreamValue::take(self.input_stream.val()) {
4789 Some(v) => self.operator.eval_owned(v).await,
4790 None => {
4791 self.operator
4792 .eval(StreamValue::peek(&self.input_stream.get()))
4793 .await
4794 }
4795 });
4796 StreamValue::consume_token(self.input_stream.val());
4797 Ok(self.operator.flush_progress())
4798 })
4799 }
4800
4801 fn start_transaction(&mut self) {
4802 self.operator.start_transaction();
4803 }
4804
4805 fn flush(&mut self) {
4806 self.operator.flush();
4807 }
4808
4809 fn is_flush_complete(&self) -> bool {
4810 self.operator.is_flush_complete()
4811 }
4812
4813 fn clock_start(&mut self, scope: Scope) {
4814 self.operator.clock_start(scope);
4815 }
4816
4817 fn clock_end(&mut self, scope: Scope) {
4818 self.operator.clock_end(scope);
4819 }
4820
4821 fn init(&mut self) {
4822 self.operator.init(&self.id);
4823 }
4824
4825 fn metadata(&self, output: &mut OperatorMeta) {
4826 self.operator.metadata(output);
4827 }
4828
4829 fn fixedpoint(&self, scope: Scope) -> bool {
4830 self.operator.fixedpoint(scope)
4831 }
4832
4833 fn checkpoint(
4834 &mut self,
4835 base: &StoragePath,
4836 files: &mut Vec<Arc<dyn FileCommitter>>,
4837 ) -> Result<(), DbspError> {
4838 self.operator
4839 .checkpoint(base, self.persistent_id().as_deref(), files)
4840 }
4841
4842 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4843 self.operator.restore(base, self.persistent_id().as_deref())
4844 }
4845
4846 fn clear_state(&mut self) -> Result<(), DbspError> {
4847 self.operator.clear_state()
4848 }
4849
4850 fn start_replay(&mut self) -> Result<(), DbspError> {
4851 self.operator.start_replay()
4852 }
4853
4854 fn is_replay_complete(&self) -> bool {
4855 self.operator.is_replay_complete()
4856 }
4857
4858 fn end_replay(&mut self) -> Result<(), DbspError> {
4859 self.operator.end_replay()
4860 }
4861
4862 fn set_label(&mut self, key: &str, value: &str) {
4863 self.labels.insert(key.to_string(), value.to_string());
4864 }
4865
4866 fn get_label(&self, key: &str) -> Option<&str> {
4867 self.labels.get(key).map(|s| s.as_str())
4868 }
4869
4870 fn labels(&self) -> &BTreeMap<String, String> {
4871 &self.labels
4872 }
4873
4874 fn as_any(&self) -> &dyn Any {
4875 self
4876 }
4877}
4878
4879struct SinkNode<C, I, Op> {
4880 id: GlobalNodeId,
4881 operator: Op,
4882 input_stream: Stream<C, I>,
4883 labels: BTreeMap<String, String>,
4884}
4885
4886impl<C, I, Op> SinkNode<C, I, Op>
4887where
4888 Op: SinkOperator<I>,
4889 C: Circuit,
4890{
4891 fn new(operator: Op, input_stream: Stream<C, I>, circuit: C, id: NodeId) -> Self {
4892 Self {
4893 id: circuit.global_node_id().child(id),
4894 operator,
4895 input_stream,
4896 labels: BTreeMap::new(),
4897 }
4898 }
4899}
4900
4901impl<C, I, Op> Node for SinkNode<C, I, Op>
4902where
4903 C: Circuit,
4904 I: Clone + 'static,
4905 Op: SinkOperator<I>,
4906{
4907 fn name(&self) -> Cow<'static, str> {
4908 self.operator.name()
4909 }
4910
4911 fn local_id(&self) -> NodeId {
4912 self.id.local_node_id().unwrap()
4913 }
4914
4915 fn global_id(&self) -> &GlobalNodeId {
4916 &self.id
4917 }
4918
4919 fn is_async(&self) -> bool {
4920 self.operator.is_async()
4921 }
4922
4923 fn is_input(&self) -> bool {
4924 self.operator.is_input()
4925 }
4926
4927 fn ready(&self) -> bool {
4928 self.operator.ready()
4929 }
4930
4931 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
4932 self.operator.register_ready_callback(cb);
4933 }
4934
4935 #[allow(clippy::await_holding_refcell_ref)]
4937 fn eval<'a>(
4938 &'a mut self,
4939 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
4940 Box::pin(async {
4941 match StreamValue::take(self.input_stream.val()) {
4942 Some(v) => self.operator.eval_owned(v).await,
4943 None => {
4944 self.operator
4945 .eval(StreamValue::peek(&self.input_stream.get()))
4946 .await
4947 }
4948 };
4949 StreamValue::consume_token(self.input_stream.val());
4950
4951 Ok(self.operator.flush_progress())
4952 })
4953 }
4954
4955 fn start_transaction(&mut self) {
4956 self.operator.start_transaction();
4957 }
4958
4959 fn flush(&mut self) {
4960 self.operator.flush();
4961 }
4962
4963 fn is_flush_complete(&self) -> bool {
4964 self.operator.is_flush_complete()
4965 }
4966
4967 fn clock_start(&mut self, scope: Scope) {
4968 self.operator.clock_start(scope);
4969 }
4970
4971 fn clock_end(&mut self, scope: Scope) {
4972 self.operator.clock_end(scope);
4973 }
4974
4975 fn init(&mut self) {
4976 self.operator.init(&self.id);
4977 }
4978
4979 fn metadata(&self, output: &mut OperatorMeta) {
4980 self.operator.metadata(output);
4981 }
4982
4983 fn fixedpoint(&self, scope: Scope) -> bool {
4984 self.operator.fixedpoint(scope)
4985 }
4986
4987 fn checkpoint(
4988 &mut self,
4989 base: &StoragePath,
4990 files: &mut Vec<Arc<dyn FileCommitter>>,
4991 ) -> Result<(), DbspError> {
4992 self.operator
4993 .checkpoint(base, self.persistent_id().as_deref(), files)
4994 }
4995
4996 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
4997 self.operator.restore(base, self.persistent_id().as_deref())
4998 }
4999
5000 fn clear_state(&mut self) -> Result<(), DbspError> {
5001 self.operator.clear_state()
5002 }
5003
5004 fn start_replay(&mut self) -> Result<(), DbspError> {
5005 self.operator.start_replay()
5006 }
5007
5008 fn is_replay_complete(&self) -> bool {
5009 self.operator.is_replay_complete()
5010 }
5011
5012 fn end_replay(&mut self) -> Result<(), DbspError> {
5013 self.operator.end_replay()
5014 }
5015
5016 fn set_label(&mut self, key: &str, value: &str) {
5017 self.labels.insert(key.to_string(), value.to_string());
5018 }
5019
5020 fn get_label(&self, key: &str) -> Option<&str> {
5021 self.labels.get(key).map(|s| s.as_str())
5022 }
5023
5024 fn labels(&self) -> &BTreeMap<String, String> {
5025 &self.labels
5026 }
5027
5028 fn as_any(&self) -> &dyn Any {
5029 self
5030 }
5031}
5032
5033struct BinarySinkNode<C, I1, I2, Op> {
5034 id: GlobalNodeId,
5035 operator: Op,
5036 input_stream1: Stream<C, I1>,
5037 input_stream2: Stream<C, I2>,
5038 is_alias: bool,
5040 labels: BTreeMap<String, String>,
5041}
5042
5043impl<C, I1, I2, Op> BinarySinkNode<C, I1, I2, Op>
5044where
5045 I1: Clone,
5046 I2: Clone,
5047 Op: BinarySinkOperator<I1, I2>,
5048 C: Circuit,
5049{
5050 fn new(
5051 operator: Op,
5052 input_stream1: Stream<C, I1>,
5053 input_stream2: Stream<C, I2>,
5054 circuit: C,
5055 id: NodeId,
5056 ) -> Self {
5057 let is_alias = input_stream1.ptr_eq(&input_stream2);
5058 Self {
5059 id: circuit.global_node_id().child(id),
5060 operator,
5061 input_stream1,
5062 input_stream2,
5063 is_alias,
5064 labels: BTreeMap::new(),
5065 }
5066 }
5067}
5068
5069impl<C, I1, I2, Op> Node for BinarySinkNode<C, I1, I2, Op>
5070where
5071 C: Circuit,
5072 I1: Clone + 'static,
5073 I2: Clone + 'static,
5074 Op: BinarySinkOperator<I1, I2>,
5075{
5076 fn name(&self) -> Cow<'static, str> {
5077 self.operator.name()
5078 }
5079
5080 fn local_id(&self) -> NodeId {
5081 self.id.local_node_id().unwrap()
5082 }
5083
5084 fn global_id(&self) -> &GlobalNodeId {
5085 &self.id
5086 }
5087
5088 fn is_async(&self) -> bool {
5089 self.operator.is_async()
5090 }
5091
5092 fn is_input(&self) -> bool {
5093 self.operator.is_input()
5094 }
5095
5096 fn ready(&self) -> bool {
5097 self.operator.ready()
5098 }
5099
5100 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5101 self.operator.register_ready_callback(cb);
5102 }
5103
5104 #[allow(clippy::await_holding_refcell_ref)]
5106 fn eval<'a>(
5107 &'a mut self,
5108 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5109 Box::pin(async {
5110 if self.is_alias {
5111 {
5112 let val1 = self.input_stream1.get();
5113 let val2 = self.input_stream2.get();
5114 self.operator
5115 .eval(
5116 Cow::Borrowed(StreamValue::peek(&val1)),
5117 Cow::Borrowed(StreamValue::peek(&val2)),
5118 )
5119 .await;
5120 }
5121
5122 StreamValue::consume_token(self.input_stream1.val());
5123 StreamValue::consume_token(self.input_stream2.val());
5124 } else {
5125 let val1 = StreamValue::take(self.input_stream1.val());
5126 let val2 = StreamValue::take(self.input_stream2.val());
5127
5128 match (val1, val2) {
5129 (Some(val1), Some(val2)) => {
5130 self.operator.eval(Cow::Owned(val1), Cow::Owned(val2)).await;
5131 }
5132 (Some(val1), None) => {
5133 self.operator
5134 .eval(
5135 Cow::Owned(val1),
5136 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5137 )
5138 .await;
5139 }
5140 (None, Some(val2)) => {
5141 self.operator
5142 .eval(
5143 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5144 Cow::Owned(val2),
5145 )
5146 .await;
5147 }
5148 (None, None) => {
5149 self.operator
5150 .eval(
5151 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5152 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5153 )
5154 .await;
5155 }
5156 }
5157
5158 StreamValue::consume_token(self.input_stream1.val());
5159 StreamValue::consume_token(self.input_stream2.val());
5160 };
5161
5162 Ok(self.operator.flush_progress())
5163 })
5164 }
5165
5166 fn start_transaction(&mut self) {
5167 self.operator.start_transaction();
5168 }
5169
5170 fn flush(&mut self) {
5171 self.operator.flush();
5172 }
5173
5174 fn is_flush_complete(&self) -> bool {
5175 self.operator.is_flush_complete()
5176 }
5177
5178 fn clock_start(&mut self, scope: Scope) {
5179 self.operator.clock_start(scope);
5180 }
5181
5182 fn clock_end(&mut self, scope: Scope) {
5183 self.operator.clock_end(scope);
5184 }
5185
5186 fn init(&mut self) {
5187 self.operator.init(&self.id);
5188 }
5189
5190 fn metadata(&self, output: &mut OperatorMeta) {
5191 self.operator.metadata(output);
5192 }
5193
5194 fn fixedpoint(&self, scope: Scope) -> bool {
5195 self.operator.fixedpoint(scope)
5196 }
5197
5198 fn checkpoint(
5199 &mut self,
5200 base: &StoragePath,
5201 files: &mut Vec<Arc<dyn FileCommitter>>,
5202 ) -> Result<(), DbspError> {
5203 self.operator
5204 .checkpoint(base, self.persistent_id().as_deref(), files)
5205 }
5206
5207 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5208 self.operator.restore(base, self.persistent_id().as_deref())
5209 }
5210
5211 fn clear_state(&mut self) -> Result<(), DbspError> {
5212 self.operator.clear_state()
5213 }
5214
5215 fn start_replay(&mut self) -> Result<(), DbspError> {
5216 self.operator.start_replay()
5217 }
5218
5219 fn is_replay_complete(&self) -> bool {
5220 self.operator.is_replay_complete()
5221 }
5222
5223 fn end_replay(&mut self) -> Result<(), DbspError> {
5224 self.operator.end_replay()
5225 }
5226
5227 fn set_label(&mut self, key: &str, value: &str) {
5228 self.labels.insert(key.to_string(), value.to_string());
5229 }
5230
5231 fn get_label(&self, key: &str) -> Option<&str> {
5232 self.labels.get(key).map(|s| s.as_str())
5233 }
5234
5235 fn labels(&self) -> &BTreeMap<String, String> {
5236 &self.labels
5237 }
5238
5239 fn as_any(&self) -> &dyn Any {
5240 self
5241 }
5242}
5243
5244struct TernarySinkNode<C, I1, I2, I3, Op> {
5245 id: GlobalNodeId,
5246 operator: Op,
5247 input_stream1: Stream<C, I1>,
5248 input_stream2: Stream<C, I2>,
5249 input_stream3: Stream<C, I3>,
5250 labels: BTreeMap<String, String>,
5251}
5252
5253impl<C, I1, I2, I3, Op> TernarySinkNode<C, I1, I2, I3, Op>
5254where
5255 I1: Clone,
5256 I2: Clone,
5257 I3: Clone,
5258 Op: TernarySinkOperator<I1, I2, I3>,
5259 C: Circuit,
5260{
5261 fn new(
5262 operator: Op,
5263 input_stream1: Stream<C, I1>,
5264 input_stream2: Stream<C, I2>,
5265 input_stream3: Stream<C, I3>,
5266 circuit: C,
5267 id: NodeId,
5268 ) -> Self {
5269 assert!(!input_stream1.ptr_eq(&input_stream2));
5270 assert!(!input_stream1.ptr_eq(&input_stream3));
5271 assert!(!input_stream2.ptr_eq(&input_stream3));
5272
5273 Self {
5274 id: circuit.global_node_id().child(id),
5275 operator,
5276 input_stream1,
5277 input_stream2,
5278 input_stream3,
5279 labels: BTreeMap::new(),
5280 }
5281 }
5282}
5283
5284impl<C, I1, I2, I3, Op> Node for TernarySinkNode<C, I1, I2, I3, Op>
5285where
5286 C: Circuit,
5287 I1: Clone + 'static,
5288 I2: Clone + 'static,
5289 I3: Clone + 'static,
5290 Op: TernarySinkOperator<I1, I2, I3>,
5291{
5292 fn name(&self) -> Cow<'static, str> {
5293 self.operator.name()
5294 }
5295
5296 fn local_id(&self) -> NodeId {
5297 self.id.local_node_id().unwrap()
5298 }
5299
5300 fn global_id(&self) -> &GlobalNodeId {
5301 &self.id
5302 }
5303
5304 fn is_async(&self) -> bool {
5305 self.operator.is_async()
5306 }
5307
5308 fn is_input(&self) -> bool {
5309 self.operator.is_input()
5310 }
5311
5312 fn ready(&self) -> bool {
5313 self.operator.ready()
5314 }
5315
5316 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5317 self.operator.register_ready_callback(cb);
5318 }
5319
5320 #[allow(clippy::await_holding_refcell_ref)]
5322 fn eval<'a>(
5323 &'a mut self,
5324 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5325 Box::pin(async {
5326 let val1 = StreamValue::take(self.input_stream1.val()).map(|val| Cow::Owned(val));
5327 let r1 = self.input_stream1.get();
5328 let val2 = StreamValue::take(self.input_stream2.val()).map(|val| Cow::Owned(val));
5329 let r2 = self.input_stream2.get();
5330 let val3 = StreamValue::take(self.input_stream3.val()).map(|val| Cow::Owned(val));
5331 let r3 = self.input_stream3.get();
5332
5333 self.operator
5334 .eval(
5335 val1.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r1))),
5336 val2.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r2))),
5337 val3.unwrap_or_else(|| Cow::Borrowed(StreamValue::peek(&r3))),
5338 )
5339 .await;
5340
5341 drop(r1);
5342 drop(r2);
5343 drop(r3);
5344
5345 StreamValue::consume_token(self.input_stream1.val());
5346 StreamValue::consume_token(self.input_stream2.val());
5347 StreamValue::consume_token(self.input_stream3.val());
5348
5349 Ok(self.operator.flush_progress())
5350 })
5351 }
5352
5353 fn start_transaction(&mut self) {
5354 self.operator.start_transaction();
5355 }
5356
5357 fn flush(&mut self) {
5358 self.operator.flush();
5359 }
5360
5361 fn is_flush_complete(&self) -> bool {
5362 self.operator.is_flush_complete()
5363 }
5364
5365 fn clock_start(&mut self, scope: Scope) {
5366 self.operator.clock_start(scope);
5367 }
5368
5369 fn clock_end(&mut self, scope: Scope) {
5370 self.operator.clock_end(scope);
5371 }
5372
5373 fn init(&mut self) {
5374 self.operator.init(&self.id);
5375 }
5376
5377 fn metadata(&self, output: &mut OperatorMeta) {
5378 self.operator.metadata(output);
5379 }
5380
5381 fn fixedpoint(&self, scope: Scope) -> bool {
5382 self.operator.fixedpoint(scope)
5383 }
5384
5385 fn checkpoint(
5386 &mut self,
5387 base: &StoragePath,
5388 files: &mut Vec<Arc<dyn FileCommitter>>,
5389 ) -> Result<(), DbspError> {
5390 self.operator
5391 .checkpoint(base, self.persistent_id().as_deref(), files)
5392 }
5393
5394 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5395 self.operator.restore(base, self.persistent_id().as_deref())
5396 }
5397
5398 fn clear_state(&mut self) -> Result<(), DbspError> {
5399 self.operator.clear_state()
5400 }
5401
5402 fn start_replay(&mut self) -> Result<(), DbspError> {
5403 self.operator.start_replay()
5404 }
5405
5406 fn is_replay_complete(&self) -> bool {
5407 self.operator.is_replay_complete()
5408 }
5409
5410 fn end_replay(&mut self) -> Result<(), DbspError> {
5411 self.operator.end_replay()
5412 }
5413
5414 fn set_label(&mut self, key: &str, value: &str) {
5415 self.labels.insert(key.to_string(), value.to_string());
5416 }
5417
5418 fn get_label(&self, key: &str) -> Option<&str> {
5419 self.labels.get(key).map(|s| s.as_str())
5420 }
5421
5422 fn labels(&self) -> &BTreeMap<String, String> {
5423 &self.labels
5424 }
5425
5426 fn as_any(&self) -> &dyn Any {
5427 self
5428 }
5429}
5430
5431struct BinaryNode<C, I1, I2, O, Op> {
5432 id: GlobalNodeId,
5433 operator: Op,
5434 input_stream1: Stream<C, I1>,
5435 input_stream2: Stream<C, I2>,
5436 output_stream: Stream<C, O>,
5437 is_alias: bool,
5439 labels: BTreeMap<String, String>,
5440}
5441
5442impl<C, I1, I2, O, Op> BinaryNode<C, I1, I2, O, Op>
5443where
5444 Op: BinaryOperator<I1, I2, O>,
5445 C: Circuit,
5446{
5447 fn new(
5448 operator: Op,
5449 input_stream1: Stream<C, I1>,
5450 input_stream2: Stream<C, I2>,
5451 circuit: C,
5452 id: NodeId,
5453 ) -> Self {
5454 let is_alias = input_stream1.ptr_eq(&input_stream2);
5455 Self {
5456 id: circuit.global_node_id().child(id),
5457 operator,
5458 input_stream1,
5459 input_stream2,
5460 is_alias,
5461 output_stream: Stream::new(circuit, id),
5462 labels: BTreeMap::new(),
5463 }
5464 }
5465
5466 fn output_stream(&self) -> Stream<C, O> {
5467 self.output_stream.clone()
5468 }
5469}
5470
5471impl<C, I1, I2, O, Op> Node for BinaryNode<C, I1, I2, O, Op>
5472where
5473 C: Circuit,
5474 I1: Clone + 'static,
5475 I2: Clone + 'static,
5476 O: Clone + 'static,
5477 Op: BinaryOperator<I1, I2, O>,
5478{
5479 fn name(&self) -> Cow<'static, str> {
5480 self.operator.name()
5481 }
5482
5483 fn local_id(&self) -> NodeId {
5484 self.id.local_node_id().unwrap()
5485 }
5486
5487 fn global_id(&self) -> &GlobalNodeId {
5488 &self.id
5489 }
5490
5491 fn is_async(&self) -> bool {
5492 self.operator.is_async()
5493 }
5494
5495 fn is_input(&self) -> bool {
5496 self.operator.is_input()
5497 }
5498
5499 fn ready(&self) -> bool {
5500 self.operator.ready()
5501 }
5502
5503 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5504 self.operator.register_ready_callback(cb);
5505 }
5506
5507 #[allow(clippy::await_holding_refcell_ref)]
5509 fn eval<'a>(
5510 &'a mut self,
5511 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5512 Box::pin(async {
5513 if self.is_alias {
5518 {
5519 let val1 = self.input_stream1.get();
5520 let val2 = self.input_stream2.get();
5521
5522 self.output_stream.put(
5523 self.operator
5524 .eval(StreamValue::peek(&val1), StreamValue::peek(&val2))
5525 .await,
5526 );
5527 }
5528 StreamValue::consume_token(self.input_stream1.val());
5531 StreamValue::consume_token(self.input_stream2.val());
5532 } else {
5533 let val1 = StreamValue::take(self.input_stream1.val());
5534 let val2 = StreamValue::take(self.input_stream2.val());
5535
5536 self.output_stream.put(match (val1, val2) {
5537 (Some(val1), Some(val2)) => self.operator.eval_owned(val1, val2).await,
5538 (Some(val1), None) => {
5539 self.operator
5540 .eval_owned_and_ref(val1, StreamValue::peek(&self.input_stream2.get()))
5541 .await
5542 }
5543 (None, Some(val2)) => {
5544 self.operator
5545 .eval_ref_and_owned(StreamValue::peek(&self.input_stream1.get()), val2)
5546 .await
5547 }
5548 (None, None) => {
5549 self.operator
5550 .eval(
5551 StreamValue::peek(&self.input_stream1.get()),
5552 StreamValue::peek(&self.input_stream2.get()),
5553 )
5554 .await
5555 }
5556 });
5557 StreamValue::consume_token(self.input_stream1.val());
5558 StreamValue::consume_token(self.input_stream2.val());
5559 }
5560 Ok(self.operator.flush_progress())
5561 })
5562 }
5563
5564 fn start_transaction(&mut self) {
5565 self.operator.start_transaction();
5566 }
5567
5568 fn flush(&mut self) {
5569 self.operator.flush();
5570 }
5571
5572 fn is_flush_complete(&self) -> bool {
5573 self.operator.is_flush_complete()
5574 }
5575
5576 fn clock_start(&mut self, scope: Scope) {
5577 self.operator.clock_start(scope);
5578 }
5579
5580 fn clock_end(&mut self, scope: Scope) {
5581 self.operator.clock_end(scope);
5582 }
5583
5584 fn init(&mut self) {
5585 self.operator.init(&self.id);
5586 }
5587
5588 fn metadata(&self, output: &mut OperatorMeta) {
5589 self.operator.metadata(output);
5590 }
5591
5592 fn fixedpoint(&self, scope: Scope) -> bool {
5593 self.operator.fixedpoint(scope)
5594 }
5595
5596 fn checkpoint(
5597 &mut self,
5598 base: &StoragePath,
5599 files: &mut Vec<Arc<dyn FileCommitter>>,
5600 ) -> Result<(), DbspError> {
5601 self.operator
5602 .checkpoint(base, self.persistent_id().as_deref(), files)
5603 }
5604
5605 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5606 self.operator.restore(base, self.persistent_id().as_deref())
5607 }
5608
5609 fn clear_state(&mut self) -> Result<(), DbspError> {
5610 self.operator.clear_state()
5611 }
5612
5613 fn start_replay(&mut self) -> Result<(), DbspError> {
5614 self.operator.start_replay()
5615 }
5616
5617 fn is_replay_complete(&self) -> bool {
5618 self.operator.is_replay_complete()
5619 }
5620
5621 fn end_replay(&mut self) -> Result<(), DbspError> {
5622 self.operator.end_replay()
5623 }
5624
5625 fn set_label(&mut self, key: &str, value: &str) {
5626 self.labels.insert(key.to_string(), value.to_string());
5627 }
5628
5629 fn get_label(&self, key: &str) -> Option<&str> {
5630 self.labels.get(key).map(|s| s.as_str())
5631 }
5632
5633 fn labels(&self) -> &BTreeMap<String, String> {
5634 &self.labels
5635 }
5636
5637 fn as_any(&self) -> &dyn Any {
5638 self
5639 }
5640}
5641
5642struct TernaryNode<C, I1, I2, I3, O, Op> {
5643 id: GlobalNodeId,
5644 operator: Op,
5645 input_stream1: Stream<C, I1>,
5646 input_stream2: Stream<C, I2>,
5647 input_stream3: Stream<C, I3>,
5648 output_stream: Stream<C, O>,
5649 labels: BTreeMap<String, String>,
5650}
5651
5652impl<C, I1, I2, I3, O, Op> TernaryNode<C, I1, I2, I3, O, Op>
5653where
5654 I1: Clone,
5655 I2: Clone,
5656 I3: Clone,
5657 Op: TernaryOperator<I1, I2, I3, O>,
5658 C: Circuit,
5659{
5660 fn new(
5661 operator: Op,
5662 input_stream1: Stream<C, I1>,
5663 input_stream2: Stream<C, I2>,
5664 input_stream3: Stream<C, I3>,
5665 circuit: C,
5666 id: NodeId,
5667 ) -> Self {
5668 Self {
5669 id: circuit.global_node_id().child(id),
5670 operator,
5671 input_stream1,
5672 input_stream2,
5673 input_stream3,
5674 output_stream: Stream::new(circuit, id),
5677 labels: BTreeMap::new(),
5678 }
5679 }
5680
5681 fn output_stream(&self) -> Stream<C, O> {
5682 self.output_stream.clone()
5683 }
5684}
5685
5686impl<C, I1, I2, I3, O, Op> Node for TernaryNode<C, I1, I2, I3, O, Op>
5687where
5688 C: Circuit,
5689 I1: Clone + 'static,
5690 I2: Clone + 'static,
5691 I3: Clone + 'static,
5692 O: Clone + 'static,
5693 Op: TernaryOperator<I1, I2, I3, O>,
5694{
5695 fn name(&self) -> Cow<'static, str> {
5696 self.operator.name()
5697 }
5698
5699 fn local_id(&self) -> NodeId {
5700 self.id.local_node_id().unwrap()
5701 }
5702
5703 fn global_id(&self) -> &GlobalNodeId {
5704 &self.id
5705 }
5706
5707 fn is_async(&self) -> bool {
5708 self.operator.is_async()
5709 }
5710
5711 fn is_input(&self) -> bool {
5712 self.operator.is_input()
5713 }
5714
5715 fn ready(&self) -> bool {
5716 self.operator.ready()
5717 }
5718
5719 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5720 self.operator.register_ready_callback(cb);
5721 }
5722
5723 #[allow(clippy::await_holding_refcell_ref)]
5725 fn eval<'a>(
5726 &'a mut self,
5727 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5728 Box::pin(async {
5729 {
5730 self.output_stream.put(
5731 self.operator
5732 .eval(
5733 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5734 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5735 Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
5736 )
5737 .await,
5738 );
5739 }
5740
5741 StreamValue::consume_token(self.input_stream1.val());
5742 StreamValue::consume_token(self.input_stream2.val());
5743 StreamValue::consume_token(self.input_stream3.val());
5744
5745 Ok(self.operator.flush_progress())
5746 })
5747 }
5748
5749 fn start_transaction(&mut self) {
5750 self.operator.start_transaction();
5751 }
5752
5753 fn flush(&mut self) {
5754 self.operator.flush();
5755 }
5756
5757 fn is_flush_complete(&self) -> bool {
5758 self.operator.is_flush_complete()
5759 }
5760
5761 fn clock_start(&mut self, scope: Scope) {
5762 self.operator.clock_start(scope);
5763 }
5764
5765 fn clock_end(&mut self, scope: Scope) {
5766 self.operator.clock_end(scope);
5767 }
5768
5769 fn init(&mut self) {
5770 self.operator.init(&self.id);
5771 }
5772
5773 fn metadata(&self, output: &mut OperatorMeta) {
5774 self.operator.metadata(output);
5775 }
5776
5777 fn fixedpoint(&self, scope: Scope) -> bool {
5778 self.operator.fixedpoint(scope)
5779 }
5780
5781 fn checkpoint(
5782 &mut self,
5783 base: &StoragePath,
5784 files: &mut Vec<Arc<dyn FileCommitter>>,
5785 ) -> Result<(), DbspError> {
5786 self.operator
5787 .checkpoint(base, self.persistent_id().as_deref(), files)
5788 }
5789
5790 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5791 self.operator.restore(base, self.persistent_id().as_deref())
5792 }
5793
5794 fn clear_state(&mut self) -> Result<(), DbspError> {
5795 self.operator.clear_state()
5796 }
5797
5798 fn start_replay(&mut self) -> Result<(), DbspError> {
5799 self.operator.start_replay()
5800 }
5801
5802 fn is_replay_complete(&self) -> bool {
5803 self.operator.is_replay_complete()
5804 }
5805
5806 fn end_replay(&mut self) -> Result<(), DbspError> {
5807 self.operator.end_replay()
5808 }
5809
5810 fn set_label(&mut self, key: &str, value: &str) {
5811 self.labels.insert(key.to_string(), value.to_string());
5812 }
5813
5814 fn get_label(&self, key: &str) -> Option<&str> {
5815 self.labels.get(key).map(|s| s.as_str())
5816 }
5817
5818 fn labels(&self) -> &BTreeMap<String, String> {
5819 &self.labels
5820 }
5821
5822 fn as_any(&self) -> &dyn Any {
5823 self
5824 }
5825}
5826
5827struct QuaternaryNode<C, I1, I2, I3, I4, O, Op> {
5828 id: GlobalNodeId,
5829 operator: Op,
5830 input_stream1: Stream<C, I1>,
5831 input_stream2: Stream<C, I2>,
5832 input_stream3: Stream<C, I3>,
5833 input_stream4: Stream<C, I4>,
5834 output_stream: Stream<C, O>,
5835 labels: BTreeMap<String, String>,
5836 }
5844
5845impl<C, I1, I2, I3, I4, O, Op> QuaternaryNode<C, I1, I2, I3, I4, O, Op>
5846where
5847 I1: Clone,
5848 I2: Clone,
5849 I3: Clone,
5850 I4: Clone,
5851 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
5852 C: Circuit,
5853{
5854 fn new(
5855 operator: Op,
5856 input_stream1: Stream<C, I1>,
5857 input_stream2: Stream<C, I2>,
5858 input_stream3: Stream<C, I3>,
5859 input_stream4: Stream<C, I4>,
5860 circuit: C,
5861 id: NodeId,
5862 ) -> Self {
5863 Self {
5870 id: circuit.global_node_id().child(id),
5871 operator,
5872 input_stream1,
5873 input_stream2,
5874 input_stream3,
5875 input_stream4,
5876 output_stream: Stream::new(circuit, id),
5880 labels: BTreeMap::new(),
5881 }
5882 }
5883
5884 fn output_stream(&self) -> Stream<C, O> {
5885 self.output_stream.clone()
5886 }
5887}
5888
5889impl<C, I1, I2, I3, I4, O, Op> Node for QuaternaryNode<C, I1, I2, I3, I4, O, Op>
5890where
5891 C: Circuit,
5892 I1: Clone + 'static,
5893 I2: Clone + 'static,
5894 I3: Clone + 'static,
5895 I4: Clone + 'static,
5896 O: Clone + 'static,
5897 Op: QuaternaryOperator<I1, I2, I3, I4, O>,
5898{
5899 fn name(&self) -> Cow<'static, str> {
5900 self.operator.name()
5901 }
5902
5903 fn local_id(&self) -> NodeId {
5904 self.id.local_node_id().unwrap()
5905 }
5906
5907 fn global_id(&self) -> &GlobalNodeId {
5908 &self.id
5909 }
5910
5911 fn is_async(&self) -> bool {
5912 self.operator.is_async()
5913 }
5914
5915 fn is_input(&self) -> bool {
5916 self.operator.is_input()
5917 }
5918
5919 fn ready(&self) -> bool {
5920 self.operator.ready()
5921 }
5922
5923 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
5924 self.operator.register_ready_callback(cb);
5925 }
5926
5927 #[allow(clippy::await_holding_refcell_ref)]
5929 fn eval<'a>(
5930 &'a mut self,
5931 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
5932 Box::pin(async {
5933 {
5934 self.output_stream.put(
5935 self.operator
5936 .eval(
5937 Cow::Borrowed(StreamValue::peek(&self.input_stream1.get())),
5938 Cow::Borrowed(StreamValue::peek(&self.input_stream2.get())),
5939 Cow::Borrowed(StreamValue::peek(&self.input_stream3.get())),
5940 Cow::Borrowed(StreamValue::peek(&self.input_stream4.get())),
5941 )
5942 .await,
5943 );
5944 }
5945
5946 StreamValue::consume_token(self.input_stream1.val());
5947 StreamValue::consume_token(self.input_stream2.val());
5948 StreamValue::consume_token(self.input_stream3.val());
5949 StreamValue::consume_token(self.input_stream4.val());
5950
5951 Ok(self.operator.flush_progress())
5952 })
5953 }
5954
5955 fn start_transaction(&mut self) {
5956 self.operator.start_transaction();
5957 }
5958
5959 fn flush(&mut self) {
5960 self.operator.flush();
5961 }
5962
5963 fn is_flush_complete(&self) -> bool {
5964 self.operator.is_flush_complete()
5965 }
5966
5967 fn clock_start(&mut self, scope: Scope) {
5968 self.operator.clock_start(scope);
5969 }
5970
5971 fn clock_end(&mut self, scope: Scope) {
5972 self.operator.clock_end(scope);
5973 }
5974
5975 fn init(&mut self) {
5976 self.operator.init(&self.id);
5977 }
5978
5979 fn metadata(&self, output: &mut OperatorMeta) {
5980 self.operator.metadata(output);
5981 }
5982
5983 fn fixedpoint(&self, scope: Scope) -> bool {
5984 self.operator.fixedpoint(scope)
5985 }
5986
5987 fn checkpoint(
5988 &mut self,
5989 base: &StoragePath,
5990 files: &mut Vec<Arc<dyn FileCommitter>>,
5991 ) -> Result<(), DbspError> {
5992 self.operator
5993 .checkpoint(base, self.persistent_id().as_deref(), files)
5994 }
5995
5996 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
5997 self.operator.restore(base, self.persistent_id().as_deref())
5998 }
5999
6000 fn clear_state(&mut self) -> Result<(), DbspError> {
6001 self.operator.clear_state()
6002 }
6003
6004 fn start_replay(&mut self) -> Result<(), DbspError> {
6005 self.operator.start_replay()
6006 }
6007
6008 fn is_replay_complete(&self) -> bool {
6009 self.operator.is_replay_complete()
6010 }
6011
6012 fn end_replay(&mut self) -> Result<(), DbspError> {
6013 self.operator.end_replay()
6014 }
6015
6016 fn set_label(&mut self, key: &str, value: &str) {
6017 self.labels.insert(key.to_string(), value.to_string());
6018 }
6019
6020 fn get_label(&self, key: &str) -> Option<&str> {
6021 self.labels.get(key).map(|s| s.as_str())
6022 }
6023
6024 fn labels(&self) -> &BTreeMap<String, String> {
6025 &self.labels
6026 }
6027
6028 fn as_any(&self) -> &dyn Any {
6029 self
6030 }
6031}
6032
6033struct NaryNode<C, I, O, Op>
6034where
6035 I: Clone + 'static,
6036{
6037 id: GlobalNodeId,
6038 operator: Op,
6039 input_streams: Vec<Stream<C, I>>,
6042 output_stream: Stream<C, O>,
6045 labels: BTreeMap<String, String>,
6046}
6047
6048impl<C, I, O, Op> NaryNode<C, I, O, Op>
6049where
6050 I: Clone + 'static,
6051 Op: NaryOperator<I, O>,
6052 C: Circuit,
6053{
6054 fn new<Iter>(operator: Op, input_streams: Iter, circuit: C, id: NodeId) -> Self
6055 where
6056 Iter: IntoIterator<Item = Stream<C, I>>,
6057 {
6058 let mut input_streams: Vec<_> = input_streams.into_iter().collect();
6059 input_streams.shrink_to_fit();
6071 Self {
6072 id: circuit.global_node_id().child(id),
6073 operator,
6074 input_streams,
6075 output_stream: Stream::new(circuit, id),
6077 labels: BTreeMap::new(),
6078 }
6079 }
6080
6081 fn output_stream(&self) -> Stream<C, O> {
6082 self.output_stream.clone()
6083 }
6084}
6085
6086impl<C, I, O, Op> Node for NaryNode<C, I, O, Op>
6087where
6088 C: Circuit,
6089 I: Clone,
6090 O: Clone + 'static,
6091 Op: NaryOperator<I, O>,
6092{
6093 fn name(&self) -> Cow<'static, str> {
6094 self.operator.name()
6095 }
6096
6097 fn local_id(&self) -> NodeId {
6098 self.id.local_node_id().unwrap()
6099 }
6100
6101 fn global_id(&self) -> &GlobalNodeId {
6102 &self.id
6103 }
6104
6105 fn is_async(&self) -> bool {
6106 self.operator.is_async()
6107 }
6108
6109 fn is_input(&self) -> bool {
6110 self.operator.is_input()
6111 }
6112
6113 fn ready(&self) -> bool {
6114 self.operator.ready()
6115 }
6116
6117 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6118 self.operator.register_ready_callback(cb);
6119 }
6120
6121 fn eval<'a>(
6122 &'a mut self,
6123 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6124 Box::pin(async {
6125 let refs = self
6126 .input_streams
6127 .iter()
6128 .map(|stream| stream.get())
6129 .collect::<Vec<_>>();
6130
6131 self.output_stream.put(
6132 self.operator
6133 .eval(refs.iter().map(|r| Cow::Borrowed(StreamValue::peek(r))))
6134 .await,
6135 );
6136
6137 std::mem::drop(refs);
6138
6139 for i in self.input_streams.iter() {
6140 StreamValue::consume_token(i.val());
6141 }
6142 Ok(self.operator.flush_progress())
6143 })
6144 }
6145
6146 fn start_transaction(&mut self) {
6147 self.operator.start_transaction();
6148 }
6149
6150 fn flush(&mut self) {
6151 self.operator.flush();
6152 }
6153
6154 fn is_flush_complete(&self) -> bool {
6155 self.operator.is_flush_complete()
6156 }
6157
6158 fn clock_start(&mut self, scope: Scope) {
6159 self.operator.clock_start(scope);
6160 }
6161
6162 fn clock_end(&mut self, scope: Scope) {
6163 self.operator.clock_end(scope);
6164 }
6165
6166 fn init(&mut self) {
6167 self.operator.init(&self.id);
6168 }
6169
6170 fn metadata(&self, output: &mut OperatorMeta) {
6171 self.operator.metadata(output);
6172 }
6173
6174 fn fixedpoint(&self, scope: Scope) -> bool {
6175 self.operator.fixedpoint(scope)
6176 }
6177
6178 fn checkpoint(
6179 &mut self,
6180 base: &StoragePath,
6181 files: &mut Vec<Arc<dyn FileCommitter>>,
6182 ) -> Result<(), DbspError> {
6183 self.operator
6184 .checkpoint(base, self.persistent_id().as_deref(), files)
6185 }
6186
6187 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6188 self.operator.restore(base, self.persistent_id().as_deref())
6189 }
6190
6191 fn clear_state(&mut self) -> Result<(), DbspError> {
6192 self.operator.clear_state()
6193 }
6194
6195 fn start_replay(&mut self) -> Result<(), DbspError> {
6196 self.operator.start_replay()
6197 }
6198
6199 fn is_replay_complete(&self) -> bool {
6200 self.operator.is_replay_complete()
6201 }
6202
6203 fn end_replay(&mut self) -> Result<(), DbspError> {
6204 self.operator.end_replay()
6205 }
6206
6207 fn set_label(&mut self, key: &str, value: &str) {
6208 self.labels.insert(key.to_string(), value.to_string());
6209 }
6210
6211 fn get_label(&self, key: &str) -> Option<&str> {
6212 self.labels.get(key).map(|s| s.as_str())
6213 }
6214
6215 fn labels(&self) -> &BTreeMap<String, String> {
6216 &self.labels
6217 }
6218
6219 fn as_any(&self) -> &dyn Any {
6220 self
6221 }
6222}
6223
6224struct FeedbackOutputNode<C, I, O, Op>
6230where
6231 C: Circuit,
6232{
6233 id: GlobalNodeId,
6234 operator: Rc<RefCell<Op>>,
6235 output_stream: Stream<C, O>,
6236 export_stream: Option<Stream<C::Parent, O>>,
6237 phantom_input: PhantomData<I>,
6238 labels: BTreeMap<String, String>,
6239}
6240
6241impl<C, I, O, Op> FeedbackOutputNode<C, I, O, Op>
6242where
6243 C: Circuit,
6244 Op: StrictUnaryOperator<I, O>,
6245{
6246 fn new(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6247 Self {
6248 id: circuit.global_node_id().child(id),
6249 operator,
6250 output_stream: Stream::new(circuit.clone(), id),
6251 export_stream: None,
6252 phantom_input: PhantomData,
6253 labels: BTreeMap::new(),
6254 }
6255 }
6256
6257 fn with_export(operator: Rc<RefCell<Op>>, circuit: C, id: NodeId) -> Self {
6258 let mut result = Self::new(operator, circuit.clone(), id);
6259 result.export_stream = Some(Stream::with_origin(
6260 circuit.parent(),
6261 circuit.allocate_stream_id(),
6262 circuit.node_id(),
6263 GlobalNodeId::child_of(&circuit, id),
6264 ));
6265 result
6266 }
6267
6268 fn output_stream(&self) -> Stream<C, O> {
6269 self.output_stream.clone()
6270 }
6271}
6272
6273impl<C, I, O, Op> Node for FeedbackOutputNode<C, I, O, Op>
6274where
6275 C: Circuit,
6276 I: Data,
6277 O: Clone + 'static,
6278 Op: StrictUnaryOperator<I, O>,
6279{
6280 fn local_id(&self) -> NodeId {
6281 self.id.local_node_id().unwrap()
6282 }
6283
6284 fn global_id(&self) -> &GlobalNodeId {
6285 &self.id
6286 }
6287
6288 fn name(&self) -> Cow<'static, str> {
6289 self.operator.borrow().name()
6290 }
6291
6292 fn is_async(&self) -> bool {
6293 self.operator.borrow().is_async()
6294 }
6295
6296 fn is_input(&self) -> bool {
6297 self.operator.borrow().is_input()
6298 }
6299
6300 fn ready(&self) -> bool {
6301 self.operator.borrow().ready()
6302 }
6303
6304 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6305 self.operator.borrow_mut().register_ready_callback(cb);
6306 }
6307
6308 fn eval<'a>(
6309 &'a mut self,
6310 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6311 Box::pin(async {
6312 self.output_stream
6313 .put(self.operator.borrow_mut().get_output());
6314 Ok(None)
6315 })
6316 }
6317
6318 fn start_transaction(&mut self) {
6319 self.operator.borrow_mut().start_transaction();
6320 }
6321
6322 fn flush(&mut self) {
6323 self.operator.borrow_mut().flush();
6324 }
6325
6326 fn is_flush_complete(&self) -> bool {
6327 self.operator.borrow().is_flush_complete()
6328 }
6329
6330 fn clock_start(&mut self, scope: Scope) {
6331 self.operator.borrow_mut().clock_start(scope)
6332 }
6333
6334 fn clock_end(&mut self, scope: Scope) {
6335 if scope == 0
6336 && let Some(export_stream) = &mut self.export_stream
6337 {
6338 export_stream.put(self.operator.borrow_mut().get_final_output());
6339 }
6340 self.operator.borrow_mut().clock_end(scope);
6341 }
6342
6343 fn init(&mut self) {
6344 self.operator.borrow_mut().init(&self.id);
6345 }
6346
6347 fn metadata(&self, _output: &mut OperatorMeta) {
6348 }
6351
6352 fn fixedpoint(&self, scope: Scope) -> bool {
6353 self.operator.borrow().fixedpoint(scope)
6354 }
6355
6356 fn checkpoint(
6357 &mut self,
6358 base: &StoragePath,
6359 files: &mut Vec<Arc<dyn FileCommitter>>,
6360 ) -> Result<(), DbspError> {
6361 self.operator
6362 .borrow_mut()
6363 .checkpoint(base, self.persistent_id().as_deref(), files)
6364 }
6365
6366 fn restore(&mut self, base: &StoragePath) -> Result<(), DbspError> {
6367 self.operator
6368 .borrow_mut()
6369 .restore(base, self.persistent_id().as_deref())
6370 }
6371
6372 fn clear_state(&mut self) -> Result<(), DbspError> {
6373 self.operator.borrow_mut().clear_state()
6374 }
6375
6376 fn start_replay(&mut self) -> Result<(), DbspError> {
6377 self.operator.borrow_mut().start_replay()
6378 }
6379
6380 fn is_replay_complete(&self) -> bool {
6381 self.operator.borrow().is_replay_complete()
6382 }
6383
6384 fn end_replay(&mut self) -> Result<(), DbspError> {
6385 self.operator.borrow_mut().end_replay()
6386 }
6387
6388 fn set_label(&mut self, key: &str, value: &str) {
6389 self.labels.insert(key.to_string(), value.to_string());
6390 }
6391
6392 fn get_label(&self, key: &str) -> Option<&str> {
6393 self.labels.get(key).map(|s| s.as_str())
6394 }
6395
6396 fn labels(&self) -> &BTreeMap<String, String> {
6397 &self.labels
6398 }
6399
6400 fn as_any(&self) -> &dyn Any {
6401 self
6402 }
6403}
6404
6405struct FeedbackInputNode<C, I, O, Op> {
6407 id: GlobalNodeId,
6409 operator: Rc<RefCell<Op>>,
6410 input_stream: Stream<C, I>,
6411 phantom_output: PhantomData<O>,
6412 labels: BTreeMap<String, String>,
6413}
6414
6415impl<C, I, O, Op> FeedbackInputNode<C, I, O, Op>
6416where
6417 Op: StrictUnaryOperator<I, O>,
6418 C: Circuit,
6419{
6420 fn new(operator: Rc<RefCell<Op>>, input_stream: Stream<C, I>, id: NodeId) -> Self {
6421 Self {
6422 id: input_stream.circuit().global_node_id().child(id),
6423 operator,
6424 input_stream,
6425 phantom_output: PhantomData,
6426 labels: BTreeMap::new(),
6427 }
6428 }
6429}
6430
6431impl<C, I, O, Op> Node for FeedbackInputNode<C, I, O, Op>
6432where
6433 Op: StrictUnaryOperator<I, O>,
6434 I: Data,
6435 O: 'static,
6436 C: Clone + 'static,
6437{
6438 fn name(&self) -> Cow<'static, str> {
6439 self.operator.borrow().name()
6440 }
6441
6442 fn local_id(&self) -> NodeId {
6443 self.id.local_node_id().unwrap()
6444 }
6445
6446 fn global_id(&self) -> &GlobalNodeId {
6447 &self.id
6448 }
6449
6450 fn is_async(&self) -> bool {
6451 self.operator.borrow().is_async()
6452 }
6453
6454 fn is_input(&self) -> bool {
6455 self.operator.borrow().is_input()
6456 }
6457
6458 fn ready(&self) -> bool {
6459 self.operator.borrow().ready()
6460 }
6461
6462 fn register_ready_callback(&mut self, cb: Box<dyn Fn() + Send + Sync>) {
6463 self.operator.borrow_mut().register_ready_callback(cb);
6464 }
6465
6466 #[allow(clippy::await_holding_refcell_ref)]
6468 fn eval<'a>(
6469 &'a mut self,
6470 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6471 Box::pin(async {
6472 match StreamValue::take(self.input_stream.val()) {
6473 Some(v) => self.operator.borrow_mut().eval_strict_owned(v).await,
6474 None => {
6475 self.operator
6476 .borrow_mut()
6477 .eval_strict(StreamValue::peek(&self.input_stream.get()))
6478 .await
6479 }
6480 };
6481
6482 StreamValue::consume_token(self.input_stream.val());
6483
6484 Ok(None)
6485 })
6486 }
6487
6488 fn start_transaction(&mut self) {
6489 self.operator.borrow_mut().start_transaction();
6490 }
6491
6492 fn flush(&mut self) {
6493 self.operator.borrow_mut().flush();
6494 }
6495
6496 fn is_flush_complete(&self) -> bool {
6497 self.operator.borrow().is_flush_complete()
6498 }
6499
6500 fn clock_start(&mut self, _scope: Scope) {}
6503
6504 fn clock_end(&mut self, _scope: Scope) {}
6505
6506 fn init(&mut self) {
6507 self.operator.borrow_mut().init(&self.id);
6508 }
6509
6510 fn metadata(&self, output: &mut OperatorMeta) {
6511 self.operator.borrow().metadata(output)
6512 }
6513
6514 fn fixedpoint(&self, scope: Scope) -> bool {
6515 self.operator.borrow().fixedpoint(scope)
6516 }
6517
6518 fn checkpoint(
6519 &mut self,
6520 _base: &StoragePath,
6521 _files: &mut Vec<Arc<dyn FileCommitter>>,
6522 ) -> Result<(), DbspError> {
6523 Ok(())
6530 }
6531
6532 fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6533 Ok(())
6535 }
6536
6537 fn clear_state(&mut self) -> Result<(), DbspError> {
6538 Ok(())
6539 }
6540
6541 fn start_replay(&mut self) -> Result<(), DbspError> {
6542 self.operator.borrow_mut().start_replay()
6543 }
6544
6545 fn is_replay_complete(&self) -> bool {
6546 self.operator.borrow().is_replay_complete()
6547 }
6548
6549 fn end_replay(&mut self) -> Result<(), DbspError> {
6550 self.operator.borrow_mut().end_replay()
6551 }
6552
6553 fn set_label(&mut self, key: &str, value: &str) {
6554 self.labels.insert(key.to_string(), value.to_string());
6555 }
6556
6557 fn get_label(&self, key: &str) -> Option<&str> {
6558 self.labels.get(key).map(|s| s.as_str())
6559 }
6560
6561 fn labels(&self) -> &BTreeMap<String, String> {
6562 &self.labels
6563 }
6564
6565 fn as_any(&self) -> &dyn Any {
6566 self
6567 }
6568}
6569
6570pub struct FeedbackConnector<C, I, O, Op> {
6578 output_node_id: NodeId,
6579 circuit: C,
6580 operator: Rc<RefCell<Op>>,
6581 phantom_input: PhantomData<I>,
6582 phantom_output: PhantomData<O>,
6583}
6584
6585impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6586where
6587 Op: StrictUnaryOperator<I, O>,
6588{
6589 fn new(output_node_id: NodeId, circuit: C, operator: Rc<RefCell<Op>>) -> Self {
6590 Self {
6591 output_node_id,
6592 circuit,
6593 operator,
6594 phantom_input: PhantomData,
6595 phantom_output: PhantomData,
6596 }
6597 }
6598}
6599
6600impl<C, I, O, Op> FeedbackConnector<C, I, O, Op>
6601where
6602 Op: StrictUnaryOperator<I, O>,
6603 I: Data,
6604 O: Data,
6605 C: Circuit,
6606{
6607 pub fn operator_mut(&self) -> RefMut<'_, Op> {
6608 self.operator.borrow_mut()
6609 }
6610
6611 pub fn connect(self, input_stream: &Stream<C, I>) {
6616 self.connect_with_preference(input_stream, OwnershipPreference::INDIFFERENT)
6617 }
6618
6619 pub fn connect_with_preference(
6620 self,
6621 input_stream: &Stream<C, I>,
6622 input_preference: OwnershipPreference,
6623 ) {
6624 self.circuit.connect_feedback_with_preference(
6625 self.output_node_id,
6626 self.operator,
6627 input_stream,
6628 input_preference,
6629 )
6630 }
6631}
6632
6633struct ChildNode<C>
6635where
6636 C: Circuit,
6637{
6638 id: GlobalNodeId,
6639 circuit: C,
6640 executor: Box<dyn Executor<C>>,
6641 labels: BTreeMap<String, String>,
6642 nesting_depth: Scope,
6643}
6644
6645impl<C> Drop for ChildNode<C>
6646where
6647 C: Circuit,
6648{
6649 fn drop(&mut self) {
6650 self.circuit.clear();
6653 }
6654}
6655
6656impl<C> ChildNode<C>
6657where
6658 C: Circuit,
6659{
6660 fn new<E>(circuit: C, nesting_depth: Scope, executor: E) -> Self
6661 where
6662 E: Executor<C>,
6663 {
6664 Self {
6665 id: circuit.global_node_id(),
6666 circuit,
6667 executor: Box::new(executor) as Box<dyn Executor<C>>,
6668 labels: BTreeMap::new(),
6669 nesting_depth,
6670 }
6671 }
6672}
6673
6674impl<C> Node for ChildNode<C>
6675where
6676 C: Circuit,
6677{
6678 fn name(&self) -> Cow<'static, str> {
6679 Cow::Borrowed("Subcircuit")
6680 }
6681
6682 fn local_id(&self) -> NodeId {
6683 self.id.local_node_id().unwrap()
6684 }
6685
6686 fn global_id(&self) -> &GlobalNodeId {
6687 &self.id
6688 }
6689
6690 fn is_circuit(&self) -> bool {
6691 true
6692 }
6693
6694 fn is_async(&self) -> bool {
6695 false
6696 }
6697
6698 fn is_input(&self) -> bool {
6699 false
6700 }
6701
6702 fn ready(&self) -> bool {
6703 true
6704 }
6705
6706 fn eval<'a>(
6707 &'a mut self,
6708 ) -> Pin<Box<dyn Future<Output = Result<Option<Position>, SchedulerError>> + 'a>> {
6709 for node_id in self.circuit.import_nodes() {
6712 self.circuit.eval_import_node(node_id)
6713 }
6714 Box::pin(async {
6715 self.executor.transaction(&self.circuit).await?;
6716 Ok(None)
6717 })
6718 }
6719
6720 fn start_transaction(&mut self) {
6721 }
6723
6724 fn flush(&mut self) {
6725 self.executor.start_commit_transaction().unwrap();
6726 }
6727
6728 fn is_flush_complete(&self) -> bool {
6729 self.executor.is_commit_complete()
6730 }
6731
6732 fn clock_start(&mut self, scope: Scope) {
6733 self.circuit.clock_start(scope + self.nesting_depth);
6734 }
6735
6736 fn clock_end(&mut self, scope: Scope) {
6737 self.circuit.clock_end(scope + self.nesting_depth);
6738 }
6739
6740 fn metadata(&self, _meta: &mut OperatorMeta) {}
6741
6742 fn fixedpoint(&self, scope: Scope) -> bool {
6743 self.circuit.check_fixedpoint(scope + self.nesting_depth)
6744 }
6745
6746 fn map_nodes_recursive(
6747 &self,
6748 f: &mut dyn FnMut(&dyn Node) -> Result<(), DbspError>,
6749 ) -> Result<(), DbspError> {
6750 self.circuit.map_nodes_recursive(f)
6751 }
6752
6753 fn checkpoint(
6754 &mut self,
6755 _base: &StoragePath,
6756 _files: &mut Vec<Arc<dyn FileCommitter>>,
6757 ) -> Result<(), DbspError> {
6758 Ok(())
6759 }
6760
6761 fn restore(&mut self, _base: &StoragePath) -> Result<(), DbspError> {
6762 Ok(())
6763 }
6764
6765 fn clear_state(&mut self) -> Result<(), DbspError> {
6766 self.circuit
6767 .map_local_nodes_mut(&mut |node| node.clear_state())
6768 }
6769
6770 fn start_replay(&mut self) -> Result<(), DbspError> {
6771 Ok(())
6772 }
6773
6774 fn is_replay_complete(&self) -> bool {
6775 true
6776 }
6777
6778 fn end_replay(&mut self) -> Result<(), DbspError> {
6779 Ok(())
6780 }
6781
6782 fn set_label(&mut self, key: &str, value: &str) {
6783 self.labels.insert(key.to_string(), value.to_string());
6784 }
6785
6786 fn get_label(&self, key: &str) -> Option<&str> {
6787 self.labels.get(key).map(|s| s.as_str())
6788 }
6789
6790 fn labels(&self) -> &BTreeMap<String, String> {
6791 &self.labels
6792 }
6793
6794 fn map_child(&self, path: &[NodeId], f: &mut dyn FnMut(&dyn Node)) {
6795 self.circuit.map_node_relative(path, f);
6796 }
6797
6798 fn map_child_mut(&self, path: &[NodeId], f: &mut dyn FnMut(&mut dyn Node)) {
6799 self.circuit.map_node_mut_relative(path, f);
6800 }
6801
6802 fn as_any(&self) -> &dyn Any {
6803 self
6804 }
6805
6806 fn as_circuit(&self) -> Option<&dyn CircuitBase> {
6807 Some(&self.circuit)
6808 }
6809}
6810
6811pub struct CircuitHandle {
6817 circuit: RootCircuit,
6818 executor: Box<dyn Executor<RootCircuit>>,
6819 tokio_runtime: TokioRuntime,
6820 replay_info: Option<BootstrapInfo>,
6821}
6822
6823impl Drop for CircuitHandle {
6824 fn drop(&mut self) {
6825 self.circuit
6826 .log_scheduler_event(&SchedulerEvent::clock_end());
6827
6828 if !panicking() {
6832 self.circuit.clock_end(0)
6833 }
6834
6835 self.circuit.clear();
6840 }
6841}
6842
6843#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
6845pub struct BootstrapInfo {
6846 pub replay_sources: BTreeMap<NodeId, StreamId>,
6848
6849 #[allow(dead_code)]
6851 pub need_backfill: BTreeMap<NodeId, Option<String>>,
6852}
6853
6854impl CircuitHandle {
6855 pub fn transaction(&self) -> Result<(), DbspError> {
6857 self.tokio_runtime
6858 .block_on(async {
6859 let local_set = LocalSet::new();
6860 local_set
6861 .run_until(async { self.executor.transaction(&self.circuit).await })
6862 .await
6863 })
6864 .map_err(DbspError::Scheduler)
6865 }
6866
6867 pub fn start_transaction(&self) -> Result<(), DbspError> {
6872 self.tokio_runtime
6873 .block_on(async {
6874 let local_set = LocalSet::new();
6875 local_set
6876 .run_until(async { self.executor.start_transaction(&self.circuit).await })
6877 .await
6878 })
6879 .map_err(DbspError::Scheduler)
6880 }
6881
6882 pub fn start_commit_transaction(&self) -> Result<(), DbspError> {
6887 self.executor
6888 .start_commit_transaction()
6889 .map_err(DbspError::Scheduler)
6890 }
6891
6892 pub fn is_commit_complete(&self) -> bool {
6893 self.executor.is_commit_complete()
6894 }
6895
6896 pub fn commit_progress(&self) -> CommitProgress {
6897 self.executor.commit_progress()
6898 }
6899
6900 pub fn step(&self) -> Result<(), DbspError> {
6902 self.tokio_runtime
6903 .block_on(async {
6904 let local_set = LocalSet::new();
6905 local_set
6906 .run_until(async { self.executor.step(&self.circuit).await })
6907 .await
6908 })
6909 .map_err(DbspError::Scheduler)
6910 }
6911
6912 pub fn checkpoint(
6913 &mut self,
6914 base: &StoragePath,
6915 files: &mut Vec<Arc<dyn FileCommitter>>,
6916 ) -> Result<(), DbspError> {
6917 self.circuit
6950 .map_nodes_recursive_mut(&mut |node: &mut dyn Node| {
6951 DBSP_OPERATOR_COMMIT_LATENCY_MICROSECONDS
6952 .record_callback(|| node.checkpoint(base, files))
6953 })
6954 }
6955
6956 pub fn restore(&mut self, base: &StoragePath) -> Result<Option<BootstrapInfo>, DbspError> {
6979 let mut replay_sources: BTreeMap<NodeId, StreamId> = BTreeMap::new();
6981
6982 let mut need_backfill: BTreeSet<GlobalNodeId> = BTreeSet::new();
6984
6985 self.circuit.map_nodes_recursive_mut(
6992 &mut |node: &mut dyn Node| match node.restore(base) {
6993 Err(e) if Runtime::mode() == Mode::Ephemeral => Err(e),
6994 Err(DbspError::Storage(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
6995 need_backfill.insert(node.global_id().clone());
6996 Ok(())
6997 }
6998 Err(DbspError::IO(ioerror)) if ioerror.kind() == ErrorKind::NotFound => {
6999 need_backfill.insert(node.global_id().clone());
7000 Ok(())
7001 }
7002 Err(e) => Err(e),
7003 Ok(()) => Ok(()),
7004 },
7005 )?;
7006
7007 let additional_need_backfill: BTreeSet<GlobalNodeId> =
7009 self.invalidate_balancer_clusters(&need_backfill);
7010 if Runtime::worker_index() == 0 {
7011 debug!(
7012 "CircuitHandle::restore: additional need backfill: {:?}",
7013 additional_need_backfill
7014 );
7015 }
7016 need_backfill.extend(additional_need_backfill);
7017
7018 debug!(
7019 "worker {}: CircuitHandle::restore: found {} operators that require backfill: {:?}",
7020 Runtime::worker_index(),
7021 need_backfill.len(),
7022 need_backfill.iter().cloned().collect::<Vec<GlobalNodeId>>()
7023 );
7024
7025 let need_backfill = need_backfill
7029 .into_iter()
7030 .map(|gid| gid.top_level_ancestor())
7031 .collect::<BTreeSet<_>>();
7032
7033 let mut participate_in_backfill = need_backfill.clone();
7039
7040 let mut participate_in_backfill_new = need_backfill.clone();
7042
7043 while !participate_in_backfill_new.is_empty() {
7044 participate_in_backfill_new = self.compute_replay_nodes_step(
7045 &mut replay_sources,
7046 &need_backfill,
7047 participate_in_backfill_new,
7048 &mut participate_in_backfill,
7049 )?;
7050 }
7051
7052 debug!(
7053 "worker {}: CircuitHandle::restore: replaying {} operators: {:?}\n backfilling {} operators: {:?}\n replay circuit consists of {} operators: {:?}",
7054 Runtime::worker_index(),
7055 replay_sources.len(),
7056 replay_sources.keys().cloned().collect::<Vec<NodeId>>(),
7057 need_backfill.len(),
7058 need_backfill.iter().cloned().collect::<Vec<NodeId>>(),
7059 participate_in_backfill.len(),
7060 participate_in_backfill
7061 .iter()
7062 .cloned()
7063 .collect::<Vec<NodeId>>()
7064 );
7065
7066 assert!(
7067 replay_sources
7068 .keys()
7069 .cloned()
7070 .collect::<BTreeSet<_>>()
7071 .intersection(&need_backfill)
7072 .collect::<Vec<_>>()
7073 .is_empty()
7074 );
7075
7076 let nodes_to_backfill = participate_in_backfill
7079 .difference(&replay_sources.keys().cloned().collect::<BTreeSet<_>>())
7080 .cloned()
7081 .collect::<BTreeSet<_>>();
7082
7083 if !participate_in_backfill.is_empty() {
7084 for node_id in replay_sources.keys() {
7086 self.circuit
7087 .map_local_node_mut(*node_id, &mut |node| node.start_replay())?;
7088 }
7089
7090 for node_id in nodes_to_backfill.iter() {
7092 self.circuit
7093 .map_local_node_mut(*node_id, &mut |node| node.clear_state())?;
7094 }
7095
7096 self.executor
7098 .prepare(&self.circuit, Some(&participate_in_backfill))?;
7099
7100 let need_backfill = nodes_to_backfill
7141 .iter()
7142 .map(|node_id| {
7143 let pid = self.circuit.map_local_node_mut(*node_id, &mut |node| {
7144 node.get_label(LABEL_PERSISTENT_OPERATOR_ID)
7145 .map(|s| s.to_string())
7146 });
7147
7148 (*node_id, pid)
7149 })
7150 .collect::<BTreeMap<_, _>>();
7151
7152 let replay_info = BootstrapInfo {
7153 replay_sources: replay_sources.clone(),
7154 need_backfill,
7155 };
7156
7157 self.replay_info = Some(replay_info.clone());
7158
7159 Ok(Some(replay_info))
7160 } else {
7161 Ok(None)
7162 }
7163 }
7164
7165 fn invalidate_balancer_clusters(
7195 &self,
7196 need_backfill: &BTreeSet<GlobalNodeId>,
7197 ) -> BTreeSet<GlobalNodeId> {
7198 let need_backfill_node_ids: BTreeSet<NodeId> = need_backfill
7200 .iter()
7201 .map(|gid| gid.top_level_ancestor())
7202 .collect();
7203
7204 let additional_need_backfill = self
7206 .circuit
7207 .balancer()
7208 .invalidate_clusters_for_bootstrapping(&need_backfill_node_ids);
7209
7210 let nodes_to_add = self.propagate_need_backfill_forward(
7213 additional_need_backfill
7214 .difference(&need_backfill_node_ids)
7215 .cloned()
7216 .collect(),
7217 );
7218
7219 nodes_to_add
7221 .into_iter()
7222 .map(|node_id| GlobalNodeId::root().child(node_id))
7223 .collect()
7224 }
7225
7226 fn propagate_need_backfill_forward(
7228 &self,
7229 mut need_backfill: BTreeSet<NodeId>,
7230 ) -> BTreeSet<NodeId> {
7231 let mut worklist: Vec<NodeId> = need_backfill.iter().cloned().collect();
7233 let mut visited = BTreeSet::new();
7234
7235 while let Some(node_id) = worklist.pop() {
7236 if visited.contains(&node_id) {
7237 continue;
7238 }
7239 visited.insert(node_id);
7240
7241 let successors: Vec<NodeId> = self
7243 .circuit
7244 .edges()
7245 .by_source
7246 .get(&node_id)
7247 .into_iter()
7248 .flat_map(|edges| edges.iter().map(|edge| edge.to))
7249 .collect();
7250
7251 for successor in successors {
7252 if !visited.contains(&successor) {
7253 worklist.push(successor);
7254 need_backfill.insert(successor);
7255 }
7256 }
7257
7258 let dependencies: Vec<NodeId> = self
7260 .circuit
7261 .edges()
7262 .by_destination
7263 .get(&node_id)
7264 .into_iter()
7265 .flat_map(|edges| edges.iter())
7266 .filter(|edge| edge.is_dependency())
7267 .map(|edge| edge.from)
7268 .collect();
7269
7270 for dependency in dependencies {
7271 if !visited.contains(&dependency) {
7272 worklist.push(dependency);
7273 need_backfill.insert(dependency);
7274 }
7275 }
7276 }
7277
7278 need_backfill
7279 }
7280
7281 fn compute_replay_nodes_step(
7292 &self,
7293 replay_sources: &mut BTreeMap<NodeId, StreamId>,
7294 need_backfill: &BTreeSet<NodeId>,
7295 participate_in_backfill_new: BTreeSet<NodeId>,
7296 participate_in_backfill: &mut BTreeSet<NodeId>,
7297 ) -> Result<BTreeSet<NodeId>, DbspError> {
7298 let mut inputs = BTreeSet::new();
7299
7300 for node_id in participate_in_backfill_new.iter() {
7301 let node_inputs = self
7310 .circuit
7311 .edges()
7312 .by_destination
7313 .get(node_id)
7314 .iter()
7315 .flat_map(|edges| edges.iter())
7316 .filter(|edge| edge.is_stream())
7317 .map(|edge| {
7318 (Some(edge.stream_id().unwrap()), edge.from)
7322 })
7323 .collect::<Vec<_>>();
7324
7325 for input in node_inputs.into_iter() {
7326 inputs.insert(input);
7327 }
7328
7329 for edge in self.circuit.edges().dependencies_of(*node_id) {
7331 inputs.insert((None, edge.from));
7332 }
7333
7334 for edge in self.circuit.edges().depend_on(*node_id) {
7336 inputs.insert((None, edge.to));
7337 }
7338 }
7339
7340 let mut participate_in_backfill_new = BTreeSet::new();
7341
7342 let mut replay_streams = BTreeMap::new();
7343
7344 for (stream_id, mut node_id) in inputs.into_iter() {
7345 if let Some(stream_id) = stream_id
7350 && let Some(replay_source) = self.circuit.get_replay_source(stream_id)
7351 {
7352 if !need_backfill.contains(&replay_source.local_node_id()) {
7355 replay_streams.insert(stream_id, replay_source.clone());
7356 node_id = replay_source.local_node_id();
7364 }
7365 }
7366
7367 if !participate_in_backfill.contains(&node_id) {
7368 participate_in_backfill.insert(node_id);
7370 participate_in_backfill_new.insert(node_id);
7371 }
7372 }
7373
7374 for (original_stream, replay_stream) in replay_streams.into_iter() {
7376 replay_sources
7377 .entry(replay_stream.local_node_id())
7378 .or_insert_with(|| {
7379 self.circuit
7380 .add_replay_edges(original_stream, replay_stream.as_ref());
7381 replay_stream.stream_id()
7382 });
7383 }
7384
7385 Ok(participate_in_backfill_new)
7386 }
7387
7388 pub fn is_replay_complete(&self) -> bool {
7390 let Some(replay_info) = self.replay_info.as_ref() else {
7391 return true;
7392 };
7393
7394 replay_info.replay_sources.keys().all(|node_id| {
7395 self.circuit
7396 .map_local_node_mut(*node_id, &mut |node| node.is_replay_complete())
7397 })
7398 }
7399
7400 pub fn complete_replay(&mut self) -> Result<(), DbspError> {
7406 let Some(replay_info) = self.replay_info.take() else {
7409 return Ok(());
7410 };
7411
7412 for (node_id, stream_id) in replay_info.replay_sources.iter() {
7414 self.circuit
7415 .map_local_node_mut(*node_id, &mut |node| node.end_replay())?;
7416 self.circuit.edges_mut().delete_stream(*stream_id);
7417 }
7418
7419 self.executor.prepare(&self.circuit, None)?;
7421
7422 Ok(())
7448 }
7449
7450 pub fn fingerprint(&self) -> u64 {
7451 let mut fip = Fingerprinter::default();
7452 let _ = self.circuit.map_nodes_recursive(&mut |node: &dyn Node| {
7453 node.fingerprint(&mut fip);
7454 Ok(())
7455 });
7456 fip.finish()
7457 }
7458
7459 pub fn register_scheduler_event_handler<F>(&self, name: &str, handler: F)
7469 where
7470 F: FnMut(&SchedulerEvent<'_>) + 'static,
7471 {
7472 self.circuit.register_scheduler_event_handler(name, handler);
7473 }
7474
7475 pub fn unregister_scheduler_event_handler(&self, name: &str) -> bool {
7481 self.circuit.unregister_scheduler_event_handler(name)
7482 }
7483
7484 pub fn lir(&self) -> LirCircuit {
7486 (&self.circuit as &dyn CircuitBase).to_lir()
7487 }
7488
7489 pub fn set_balancer_hint(
7490 &self,
7491 global_node_id: &GlobalNodeId,
7492 hint: BalancerHint,
7493 ) -> Result<(), DbspError> {
7494 self.circuit.set_balancer_hint(global_node_id, hint)
7495 }
7496
7497 pub fn get_current_balancer_policy(&self) -> BTreeMap<GlobalNodeId, PartitioningPolicy> {
7498 self.circuit
7499 .get_current_balancer_policy()
7500 .into_iter()
7501 .map(|(node_id, policy)| (GlobalNodeId::root().child(node_id), policy))
7502 .collect()
7503 }
7504}
7505
7506#[cfg(test)]
7507mod tests {
7508 use crate::{
7509 Circuit, Error as DbspError, RootCircuit,
7510 circuit::schedule::{DynamicScheduler, Scheduler},
7511 monitor::TraceMonitor,
7512 operator::{Generator, Z1},
7513 };
7514 use anyhow::anyhow;
7515 use std::{cell::RefCell, ops::Deref, rc::Rc, vec::Vec};
7516
7517 #[test]
7518 fn sum_circuit_dynamic() {
7519 sum_circuit::<DynamicScheduler>();
7520 }
7521 fn sum_circuit<S>()
7523 where
7524 S: Scheduler + 'static,
7525 {
7526 let actual_output: Rc<RefCell<Vec<isize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7527 let actual_output_clone = actual_output.clone();
7528 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7529 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7530 let mut n: isize = 0;
7531 let source = circuit.add_source(Generator::new(move || {
7532 let result = n;
7533 n += 1;
7534 result
7535 }));
7536 let integrator = source.integrate();
7537 integrator.inspect(|n| println!("{}", n));
7538 integrator.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7539 Ok(())
7540 })
7541 .unwrap()
7542 .0;
7543
7544 for _ in 0..100 {
7545 circuit.transaction().unwrap();
7546 }
7547
7548 let mut sum = 0;
7549 let mut expected_output: Vec<isize> = Vec::with_capacity(100);
7550 for i in 0..100 {
7551 sum += i;
7552 expected_output.push(sum);
7553 }
7554 assert_eq!(&expected_output, actual_output.borrow().deref());
7555 }
7556
7557 #[test]
7558 fn recursive_sum_circuit_dynamic() {
7559 recursive_sum_circuit::<DynamicScheduler>()
7560 }
7561
7562 fn recursive_sum_circuit<S>()
7563 where
7564 S: Scheduler + 'static,
7565 {
7566 let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7567 let actual_output_clone = actual_output.clone();
7568
7569 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7570 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7571
7572 let mut n: usize = 0;
7573 let source = circuit.add_source(Generator::new(move || {
7574 let result = n;
7575 n += 1;
7576 result
7577 }));
7578 let (z1_output, z1_feedback) = circuit.add_feedback(Z1::new(0));
7579 let plus = source
7580 .apply2(&z1_output, |n1: &usize, n2: &usize| *n1 + *n2)
7581 .inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7582 z1_feedback.connect(&plus);
7583 Ok(())
7584 })
7585 .unwrap()
7586 .0;
7587
7588 for _ in 0..100 {
7589 circuit.transaction().unwrap();
7590 }
7591
7592 let mut sum = 0;
7593 let mut expected_output: Vec<usize> = Vec::with_capacity(100);
7594 for i in 0..100 {
7595 sum += i;
7596 expected_output.push(sum);
7597 }
7598 assert_eq!(&expected_output, actual_output.borrow().deref());
7599 }
7600
7601 #[test]
7602 fn factorial_dynamic() {
7603 factorial::<DynamicScheduler>();
7604 }
7605
7606 fn factorial<S>()
7612 where
7613 S: Scheduler + 'static,
7614 {
7615 let actual_output: Rc<RefCell<Vec<usize>>> = Rc::new(RefCell::new(Vec::with_capacity(100)));
7616 let actual_output_clone = actual_output.clone();
7617
7618 let circuit = RootCircuit::build_with_scheduler::<_, _, S>(|circuit| {
7619 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
7620
7621 let mut n: usize = 0;
7622 let source = circuit.add_source(Generator::new(move || {
7623 n += 1;
7624 n
7625 }));
7626 let fact = circuit
7627 .iterate_with_condition_and_scheduler::<_, _, S>(|child| {
7628 let mut counter = 0;
7629 let countdown = source.delta0(child).apply_mut(move |parent_val| {
7630 if *parent_val > 0 {
7631 counter = *parent_val;
7632 };
7633 let res = counter;
7634 counter -= 1;
7635 res
7636 });
7637 let (z1_output, z1_feedback) = child.add_feedback_with_export(Z1::new(1));
7638 let mul = countdown.apply2(&z1_output.local, |n1: &usize, n2: &usize| n1 * n2);
7639 z1_feedback.connect(&mul);
7640 Ok((countdown.condition(|n| *n <= 1), z1_output.export))
7641 })
7642 .unwrap();
7643 fact.inspect(move |n| actual_output_clone.borrow_mut().push(*n));
7644 Ok(())
7645 })
7646 .unwrap()
7647 .0;
7648
7649 for _ in 1..10 {
7650 circuit.transaction().unwrap();
7651 }
7652
7653 let mut expected_output: Vec<usize> = Vec::with_capacity(10);
7654 for i in 1..10 {
7655 expected_output.push(my_factorial(i));
7656 }
7657 assert_eq!(&expected_output, actual_output.borrow().deref());
7658 }
7659
7660 fn my_factorial(n: usize) -> usize {
7661 if n == 1 { 1 } else { n * my_factorial(n - 1) }
7662 }
7663
7664 #[test]
7665 fn init_circuit_constructor_error() {
7666 match RootCircuit::build(|_circuit| Err::<(), _>(anyhow!("constructor failed"))) {
7667 Err(DbspError::Constructor(msg)) => assert_eq!(msg.to_string(), "constructor failed"),
7668 _ => panic!(),
7669 }
7670 }
7671}