dbsp/operator/integrate.rs
1//! Integration operators.
2
3use crate::circuit::checkpointer::Checkpoint;
4use crate::circuit::circuit_builder::StreamId;
5use crate::dynamic::Erase;
6use crate::typed_batch::TypedBatch;
7use crate::{ChildCircuit, DBData, DBWeight, Timestamp};
8use crate::{
9 NumEntries,
10 algebra::{AddAssignByRef, AddByRef, HasZero, IndexedZSet as DynIndexedZSet},
11 circuit::{Circuit, OwnershipPreference, Stream},
12 circuit_cache_key,
13 operator::{
14 Plus,
15 differentiate::DifferentiateId,
16 z1::{DelayedFeedback, DelayedNestedFeedback},
17 },
18};
19use size_of::SizeOf;
20
21circuit_cache_key!(IntegralId<C, D>(StreamId => Stream<C, D>));
22circuit_cache_key!(NestedIntegralId<C, D>(StreamId => Stream<C, D>));
23
24impl<C, D> Stream<C, D>
25where
26 C: Circuit,
27 D: Checkpoint
28 + AddByRef
29 + AddAssignByRef
30 + Clone
31 + Eq
32 + HasZero
33 + SizeOf
34 + NumEntries
35 + 'static,
36{
37 /// Integrate the input stream.
38 ///
39 /// Computes the sum of values in the input stream.
40 /// The first output value is the first input value, the second output
41 /// value is the sum of the first two inputs, and so on.
42 ///
43 /// # Examples
44 ///
45 /// ```
46 /// # use dbsp::{
47 /// # operator::Generator,
48 /// # Circuit, RootCircuit,
49 /// # };
50 /// let circuit = RootCircuit::build(move |circuit| {
51 /// // Generate a stream of 1's.
52 /// let stream = circuit.add_source(Generator::new(|| 1));
53 /// stream.inspect(move |n| eprintln!("{n}"));
54 /// // Integrate the stream.
55 /// let integral = stream.integrate();
56 /// integral.inspect(move |n| eprintln!("{n}"));
57 /// let mut counter1 = 0;
58 /// eprintln!("{counter1}");
59 /// integral.inspect(move |n| {
60 /// counter1 += 1;
61 /// assert_eq!(*n, counter1)
62 /// });
63 /// let mut counter2 = 0;
64 /// integral.delay().inspect(move |n| {
65 /// assert_eq!(*n, counter2);
66 /// counter2 += 1;
67 /// });
68 /// Ok(())
69 /// })
70 /// .unwrap()
71 /// .0;
72 ///
73 /// for _ in 0..5 {
74 /// circuit.transaction().unwrap();
75 /// }
76 /// ```
77 ///
78 /// The above example generates the following input/output mapping:
79 ///
80 /// ```text
81 /// input: 1, 1, 1, 1, 1, ...
82 /// output: 1, 2, 3, 4, 5, ...
83 /// ```
84 #[track_caller]
85 pub fn integrate(&self) -> Stream<C, D> {
86 self.circuit()
87 .cache_get_or_insert_with(IntegralId::new(self.stream_id()), || {
88 // Integration circuit:
89 // ```
90 // input
91 // ┌─────────────────►
92 // │
93 // │ ┌───┐ current
94 // ──┴───►│ ├────────►
95 // │ + │
96 // ┌───►│ ├────┐
97 // │ └───┘ │
98 // │ │
99 // │ ┌───┐ │
100 // │ │ │ │
101 // └────┤z-1├────┘
102 // │ │
103 // └───┴────────►
104 // delayed
105 // export
106 // ```
107 self.circuit().region("integrate", || {
108 let feedback = DelayedFeedback::new(self.circuit());
109 let integral = self.circuit().add_binary_operator_with_preference(
110 <Plus<D>>::new(),
111 (
112 feedback.stream(),
113 OwnershipPreference::STRONGLY_PREFER_OWNED,
114 ),
115 (self, OwnershipPreference::PREFER_OWNED),
116 );
117 feedback.connect(&integral);
118
119 self.circuit()
120 .cache_insert(DifferentiateId::new(integral.stream_id()), self.clone());
121 integral
122 })
123 })
124 .clone()
125 }
126
127 /// Integrate stream of streams.
128 ///
129 /// Computes the sum of nested streams, i.e., rather than integrating values
130 /// in each nested stream, this function sums up entire input streams
131 /// across all parent timestamps, where the sum of streams is defined as
132 /// a stream of point-wise sums of their elements: `integral[i,j] =
133 /// sum(input[k,j]), k<=i`, where `stream[i,j]` is the value of `stream`
134 /// at time `[i,j]`, `i` is the parent timestamp, and `j` is the child
135 /// timestamp.
136 ///
137 /// Yields the sum element-by-element as the input stream is fed to the
138 /// integral.
139 ///
140 /// # Examples
141 ///
142 /// Input stream (one row per parent timestamps):
143 ///
144 /// ```text
145 /// 1 2 3 4
146 /// 1 1 1 1 1
147 /// 2 2 2 0 0
148 /// ```
149 ///
150 /// Integral:
151 ///
152 /// ```text
153 /// 1 2 3 4
154 /// 2 3 4 5 1
155 /// 4 5 6 5 1
156 /// ```
157 #[track_caller]
158 pub fn integrate_nested(&self) -> Stream<C, D> {
159 self.circuit()
160 .cache_get_or_insert_with(NestedIntegralId::new(self.stream_id()), || {
161 self.circuit().region("integrate_nested", || {
162 let feedback = DelayedNestedFeedback::new(self.circuit());
163 let integral = self.circuit().add_binary_operator_with_preference(
164 Plus::new(),
165 (
166 feedback.stream(),
167 OwnershipPreference::STRONGLY_PREFER_OWNED,
168 ),
169 (self, OwnershipPreference::PREFER_OWNED),
170 );
171 feedback.connect(&integral);
172 integral
173 })
174 })
175 .clone()
176 }
177}
178
179impl<C, T, K, V, R, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
180where
181 C: Clone + 'static,
182 T: Timestamp,
183 K: DBData + Erase<B::Key>,
184 V: DBData + Erase<B::Val>,
185 R: DBWeight + Erase<B::R>,
186 B: DynIndexedZSet + Checkpoint,
187{
188 /// Integrate the input stream, updating the output once per clock tick.
189 pub fn accumulate_integrate(&self) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>> {
190 self.circuit()
191 .non_incremental(self, |_child_circuit, stream| Ok(stream.integrate()))
192 .unwrap()
193 }
194}
195
196#[cfg(test)]
197mod test {
198 use crate::{
199 Circuit, RootCircuit, ZWeight,
200 algebra::HasZero,
201 monitor::TraceMonitor,
202 operator::{DelayedFeedback, Generator},
203 typed_batch::OrdZSet,
204 utils::Tup2,
205 zset,
206 };
207
208 #[test]
209 fn scalar_integrate() {
210 let circuit = RootCircuit::build(move |circuit| {
211 let source = circuit.add_source(Generator::new(|| 1));
212 let mut counter = 0;
213 source.integrate().inspect(move |n| {
214 counter += 1;
215 assert_eq!(*n, counter);
216 });
217 Ok(())
218 })
219 .unwrap()
220 .0;
221
222 for _ in 0..100 {
223 circuit.transaction().unwrap();
224 }
225 }
226
227 #[test]
228 fn zset_integrate() {
229 let circuit = RootCircuit::build(move |circuit| {
230 let mut counter1: u64 = 0;
231 let mut s = <OrdZSet<u64>>::zero();
232 let source = circuit.add_source(Generator::new(move || {
233 let res = s.clone();
234 s = s.merge(&zset! { counter1 => 1});
235 counter1 += 1;
236 res
237 }));
238
239 let integral = source.integrate();
240 let mut counter2 = 0;
241 integral.inspect(move |s| {
242 let mut batch = Vec::with_capacity(counter2);
243 for i in 0..counter2 {
244 batch.push(Tup2(Tup2(i as u64, ()), (counter2 - i) as ZWeight));
245 }
246 assert_eq!(s, &<OrdZSet<_>>::from_tuples((), batch));
247 counter2 += 1;
248 });
249 let mut counter3 = 0;
250 integral.delay().inspect(move |s| {
251 let mut batch = Vec::with_capacity(counter2);
252 for i in 1..counter3 {
253 batch.push(Tup2(Tup2((i - 1) as u64, ()), (counter3 - i) as ZWeight));
254 }
255 assert_eq!(s, &<OrdZSet<_>>::from_tuples((), batch));
256 counter3 += 1;
257 });
258 Ok(())
259 })
260 .unwrap()
261 .0;
262
263 for _ in 0..100 {
264 circuit.transaction().unwrap();
265 }
266 }
267
268 /// ```text
269 /// ┌───────────────────────────────────────────────────────────────────────────────────┐
270 /// │ │
271 /// │ 3,2,1,0,0,0 3,2,1,0, │
272 /// │ 4,3,2,1,0,0 7,5,3,1,0, │
273 /// │ ┌───┐ 2,1,0,0,0,0 9,6,3,1,0, │
274 /// 3,4,2,5 │ │ │ 5,4,3,2,1,0 14,10,6,3,1,0 │ 6,16,19,34
275 /// ───────────┼──►delta0──────────►│ + ├──────────┬─────►integrate_nested───────────────integrate─┼─────────────►
276 /// │ 3,0,0,0,0 │ │ │ │
277 /// │ 4,0,0,0,0 └───┘ ▼ │
278 /// │ 2,0,0,0,0 ▲ ┌──┐ ┌───┐ │
279 /// │ 5,0,0,0,0 └────┤-1│◄──|z-1| │
280 /// │ └──┘ └───┘ │
281 /// │ │
282 /// └───────────────────────────────────────────────────────────────────────────────────┘
283 /// ```
284 #[test]
285 fn scalar_integrate_nested() {
286 let circuit = RootCircuit::build(move |circuit| {
287 TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
288
289 let mut input = vec![3, 4, 2, 5].into_iter();
290
291 let mut expected_counters =
292 vec![3, 2, 1, 0, 4, 3, 2, 1, 0, 2, 1, 0, 0, 0, 5, 4, 3, 2, 1, 0].into_iter();
293 let mut expected_integrals =
294 vec![3, 2, 1, 0, 7, 5, 3, 1, 0, 9, 6, 3, 1, 0, 14, 10, 6, 3, 1, 0].into_iter();
295 let mut expected_outer_integrals = vec![6, 16, 19, 34].into_iter();
296
297 let source = circuit.add_source(Generator::new(move || input.next().unwrap()));
298 let integral = circuit
299 .iterate_with_condition(|child| {
300 let source = source.delta0(child);
301 let feedback = DelayedFeedback::new(child);
302 let plus =
303 source.plus(&feedback.stream().apply(|&n| if n > 0 { n - 1 } else { n }));
304 plus.inspect(move |n| assert_eq!(*n, expected_counters.next().unwrap()));
305 feedback.connect(&plus);
306 let integral = plus.integrate_nested();
307 integral.inspect(move |n| assert_eq!(*n, expected_integrals.next().unwrap()));
308 Ok((
309 integral.condition(|n| *n == 0),
310 integral.apply(|rc| *rc).integrate().export(),
311 ))
312 })
313 .unwrap();
314 integral.inspect(move |n| assert_eq!(*n, expected_outer_integrals.next().unwrap()));
315 Ok(())
316 })
317 .unwrap()
318 .0;
319
320 for _ in 0..4 {
321 circuit.transaction().unwrap();
322 }
323 }
324}