1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
use crate::Timestamp;
use crate::circuit::checkpointer::Checkpoint;
use crate::circuit::circuit_builder::IterativeCircuit;
use crate::{
ChildCircuit, Circuit, DBData, SchedulerError, Stream, ZWeight,
dynamic::Erase,
operator::dynamic::{
distinct::DistinctFactories, recursive::RecursiveStreams as DynRecursiveStreams,
},
typed_batch::{DynIndexedZSet, TypedBatch},
};
use impl_trait_for_tuples::impl_for_tuples;
pub trait RecursiveStreams<C>: Clone {
type Inner: DynRecursiveStreams<C> + Clone;
type Output;
/// Returns a strongly typed version of the streams.
///
/// # Safety
///
/// `inner` must be backed by concrete types that match `Self`.
unsafe fn typed(inner: &Self::Inner) -> Self;
/// Returns a strongly typed version of output streams.
///
/// # Safety
///
/// `inner` must be backed by concrete types that match `Self::Output`.
unsafe fn typed_exports(
inner: &<Self::Inner as DynRecursiveStreams<C>>::Output,
) -> Self::Output;
fn inner(&self) -> Self::Inner;
fn factories() -> <Self::Inner as DynRecursiveStreams<C>>::Factories;
}
impl<K, V, B, C> RecursiveStreams<C> for Stream<C, TypedBatch<K, V, ZWeight, B>>
where
C: Circuit,
C::Parent: Circuit,
B: Checkpoint + DynIndexedZSet + Send + Sync,
K: DBData + Erase<B::Key>,
V: DBData + Erase<B::Val>,
{
type Inner = Stream<C, B>;
type Output = Stream<C::Parent, TypedBatch<K, V, ZWeight, B>>;
unsafe fn typed(inner: &Self::Inner) -> Self {
Stream::typed(inner)
}
unsafe fn typed_exports(
inner: &<Self::Inner as DynRecursiveStreams<C>>::Output,
) -> Self::Output {
Stream::typed(inner)
}
fn inner(&self) -> Self::Inner {
self.inner()
}
fn factories() -> <Self::Inner as DynRecursiveStreams<C>>::Factories {
DistinctFactories::new::<K, V>()
}
}
#[allow(clippy::unused_unit)]
#[impl_for_tuples(14)]
#[tuple_types_custom_trait_bound(RecursiveStreams<C>)]
impl<C> RecursiveStreams<C> for Tuple {
for_tuples!( type Inner = ( #( Tuple::Inner ),* ); );
for_tuples!( type Output = ( #( Tuple::Output ),* ); );
unsafe fn typed(inner: &Self::Inner) -> Self {
(for_tuples!( #( Tuple::typed(&inner.Tuple) ),* ))
}
unsafe fn typed_exports(
inner: &<Self::Inner as DynRecursiveStreams<C>>::Output,
) -> Self::Output {
(for_tuples!( #( Tuple::typed_exports(&inner.Tuple) ),* ))
}
fn inner(&self) -> Self::Inner {
(for_tuples!( #( self.Tuple.inner() ),* ))
}
fn factories() -> <Self::Inner as DynRecursiveStreams<C>>::Factories {
(for_tuples!( #( Tuple::factories() ),* ))
}
}
impl<P, T> ChildCircuit<P, T>
where
P: 'static,
T: Timestamp,
Self: Circuit,
{
/// Create a nested circuit that computes one or more mutually recursive
/// streams of Z-sets.
///
/// This method implements a common form of iteration that computes a
/// solution to an equation `x = f(i, x)` as a fixed point of function
/// `f`. Here `x` is a single Z-set or multiple mutually recursive
/// Z-sets. The computation is maintained incrementally: at each clock
/// cycle, the parent circuit feeds an update `Δi` to the external input
/// `i` of the nested circuit, and the nested circuit computes `Δx = y
/// - x`, where `y` is a solution to the equation `y = f(i+Δi, y)`.
///
/// This method is a wrapper around [`Circuit::fixedpoint`] that
/// conceptually constructs the following circuit (the exact circuit is
/// somewhat different as it takes care of maintaining the computation
/// incrementally):
///
/// ```text
/// ┌────────────────────────────────────────┐
/// │ │
/// i │ ┌───┐ │
/// ────┼──►δ0──────►│ │ ┌────────┐ │
/// │ │ f ├─────►│distinct├──┬────┼──►
/// │ ┌──────►│ │ └────────┘ │ │
/// │ │ └───┘ │ │
/// │ │ │ │
/// │ │ │ │
/// │ │ ┌────┐ │ │
/// │ └───────┤z^-1│◄────────────────┘ │
/// │ └────┘ │
/// │ │
/// └────────────────────────────────────────┘
/// ```
///
/// where the `z^-1` operator connects the previous output of function `f`
/// to its input at the next iteration of the fixed point computation.
///
/// Note the `distinct` operator attached to the output of `f`. Most
/// recursive computations over Z-sets require this for convergence;
/// otherwise their output weights keep growing even when the set of
/// elements in the Z-set no longer changes. Hence, strictly speaking
/// this circuit computes the fixed point of equation
/// `y = distinct(f(i+Δi, y))`.
///
/// Finally, the `δ0` block in the diagram represents the
/// [`delta0`](`crate::circuit::Stream::delta0`) operator, which imports
/// streams from the parent circuit into the nested circuit. This
/// operator must be instantiated manually by the closure `f` for each
/// input stream.
///
/// # Examples
///
/// ```
/// use dbsp::{
/// operator::Generator,
/// OrdZSet,
/// Circuit, RootCircuit, Stream, zset, zset_set,
/// utils::Tup2,
/// Error as DbspError, Runtime
/// };
///
/// const STEPS: usize = 3;
///
/// // Propagate labels along graph edges.
/// let (mut circuit_handle, _output_handle) = Runtime::init_circuit(1, move |root_circuit| {
/// // Graph topology.
/// let mut edges = ([
/// // Start with four nodes connected in a cycle.
/// zset_set! { Tup2(1, 2), Tup2(2, 3), Tup2(3, 4), Tup2(4, 1) },
/// // Add an edge.
/// zset_set! { Tup2(4, 5) },
/// // Remove an edge, breaking the cycle.
/// zset! { Tup2(1, 2) => -1 },
/// ] as [_; STEPS])
/// .into_iter();
///
/// let edges = root_circuit
/// .add_source(Generator::new(move || edges.next().unwrap()));
///
/// // Initial labeling of the graph.
/// let mut init_labels = ([
/// // Start with a single label on node 1.
/// zset_set! { Tup2(1, "l1".to_string()) },
/// // Add a label to node 2.
/// zset_set! { Tup2(2, "l2".to_string()) },
/// zset! { },
/// ] as [_; STEPS])
/// .into_iter();
///
/// let init_labels = root_circuit
/// .add_source(Generator::new(move || init_labels.next().unwrap()));
///
/// // Expected _changes_ to the output graph labeling after each clock cycle.
/// let mut expected_outputs = ([
/// zset! { Tup2(1, "l1".to_string()) => 1, Tup2(2, "l1".to_string()) => 1, Tup2(3, "l1".to_string()) => 1, Tup2(4, "l1".to_string()) => 1 },
/// zset! { Tup2(1, "l2".to_string()) => 1, Tup2(2, "l2".to_string()) => 1, Tup2(3, "l2".to_string()) => 1, Tup2(4, "l2".to_string()) => 1, Tup2(5, "l1".to_string()) => 1, Tup2(5, "l2".to_string()) => 1 },
/// zset! { Tup2(2, "l1".to_string()) => -1, Tup2(3, "l1".to_string()) => -1, Tup2(4, "l1".to_string()) => -1, Tup2(5, "l1".to_string()) => -1 },
/// ] as [_; STEPS])
/// .into_iter();
///
/// let labels = root_circuit.recursive(|child_circuit, labels: Stream<_, OrdZSet<Tup2<u64, String>>>| {
/// // Import `edges` and `init_labels` relations from the parent circuit.
/// let edges = edges.delta0(child_circuit);
/// let init_labels = init_labels.delta0(child_circuit);
///
/// // Given an edge `from -> to` where the `from` node is labeled with `l`,
/// // propagate `l` to node `to`.
/// let result = labels.map_index(|Tup2(x,y)| (x.clone(), y.clone()))
/// .join(
/// &edges.map_index(|Tup2(x,y)| (x.clone(), y.clone())),
/// |_from, l, to| Tup2(*to, l.clone()),
/// )
/// .plus(&init_labels);
/// Ok(result)
/// })?;
///
/// labels.inspect(move |ls| {
/// assert_eq!(*ls, expected_outputs.next().unwrap());
/// });
///
/// Ok(labels.output())
/// })?;
///
/// for _ in 0..STEPS {
/// circuit_handle.transaction().unwrap();
/// }
///
/// Ok::<(), DbspError>(())
/// ```
#[track_caller]
pub fn recursive<F, S>(&self, f: F) -> Result<S::Output, SchedulerError>
where
S: RecursiveStreams<IterativeCircuit<Self>>,
F: FnOnce(&IterativeCircuit<Self>, S) -> Result<S, SchedulerError>,
{
self.dyn_recursive(&S::factories(), |circuit, streams: S::Inner| {
f(circuit, unsafe { S::typed(&streams) }).map(|streams| streams.inner())
})
.map(|streams| unsafe { S::typed_exports(&streams) })
}
}