Skip to main content

dbsp/operator/
integrate.rs

1//! Integration operators.
2
3use crate::circuit::checkpointer::Checkpoint;
4use crate::circuit::circuit_builder::StreamId;
5use crate::dynamic::Erase;
6use crate::typed_batch::TypedBatch;
7use crate::{ChildCircuit, DBData, DBWeight, Timestamp};
8use crate::{
9    NumEntries,
10    algebra::{AddAssignByRef, AddByRef, HasZero, IndexedZSet as DynIndexedZSet},
11    circuit::{Circuit, OwnershipPreference, Stream},
12    circuit_cache_key,
13    operator::{
14        Plus,
15        differentiate::DifferentiateId,
16        z1::{DelayedFeedback, DelayedNestedFeedback},
17    },
18};
19use size_of::SizeOf;
20
21circuit_cache_key!(IntegralId<C, D>(StreamId => Stream<C, D>));
22circuit_cache_key!(NestedIntegralId<C, D>(StreamId => Stream<C, D>));
23
24impl<C, D> Stream<C, D>
25where
26    C: Circuit,
27    D: Checkpoint
28        + AddByRef
29        + AddAssignByRef
30        + Clone
31        + Eq
32        + HasZero
33        + SizeOf
34        + NumEntries
35        + 'static,
36{
37    /// Integrate the input stream.
38    ///
39    /// Computes the sum of values in the input stream.
40    /// The first output value is the first input value, the second output
41    /// value is the sum of the first two inputs, and so on.
42    ///
43    /// # Examples
44    ///
45    /// ```
46    /// # use dbsp::{
47    /// #     operator::Generator,
48    /// #     Circuit, RootCircuit,
49    /// # };
50    /// let circuit = RootCircuit::build(move |circuit| {
51    ///     // Generate a stream of 1's.
52    ///     let stream = circuit.add_source(Generator::new(|| 1));
53    ///     stream.inspect(move |n| eprintln!("{n}"));
54    ///     // Integrate the stream.
55    ///     let integral = stream.integrate();
56    ///     integral.inspect(move |n| eprintln!("{n}"));
57    ///     let mut counter1 = 0;
58    ///     eprintln!("{counter1}");
59    ///     integral.inspect(move |n| {
60    ///         counter1 += 1;
61    ///         assert_eq!(*n, counter1)
62    ///     });
63    ///     let mut counter2 = 0;
64    ///     integral.delay().inspect(move |n| {
65    ///         assert_eq!(*n, counter2);
66    ///         counter2 += 1;
67    ///     });
68    ///     Ok(())
69    /// })
70    /// .unwrap()
71    /// .0;
72    ///
73    /// for _ in 0..5 {
74    ///     circuit.transaction().unwrap();
75    /// }
76    /// ```
77    ///
78    /// The above example generates the following input/output mapping:
79    ///
80    /// ```text
81    /// input:  1, 1, 1, 1, 1, ...
82    /// output: 1, 2, 3, 4, 5, ...
83    /// ```
84    #[track_caller]
85    pub fn integrate(&self) -> Stream<C, D> {
86        self.circuit()
87            .cache_get_or_insert_with(IntegralId::new(self.stream_id()), || {
88                // Integration circuit:
89                // ```
90                //              input
91                //   ┌─────────────────►
92                //   │
93                //   │    ┌───┐ current
94                // ──┴───►│   ├────────►
95                //        │ + │
96                //   ┌───►│   ├────┐
97                //   │    └───┘    │
98                //   │             │
99                //   │    ┌───┐    │
100                //   │    │   │    │
101                //   └────┤z-1├────┘
102                //        │   │
103                //        └───┴────────►
104                //              delayed
105                //              export
106                // ```
107                self.circuit().region("integrate", || {
108                    let feedback = DelayedFeedback::new(self.circuit());
109                    let integral = self.circuit().add_binary_operator_with_preference(
110                        <Plus<D>>::new(),
111                        (
112                            feedback.stream(),
113                            OwnershipPreference::STRONGLY_PREFER_OWNED,
114                        ),
115                        (self, OwnershipPreference::PREFER_OWNED),
116                    );
117                    feedback.connect(&integral);
118
119                    self.circuit()
120                        .cache_insert(DifferentiateId::new(integral.stream_id()), self.clone());
121                    integral
122                })
123            })
124            .clone()
125    }
126
127    /// Integrate stream of streams.
128    ///
129    /// Computes the sum of nested streams, i.e., rather than integrating values
130    /// in each nested stream, this function sums up entire input streams
131    /// across all parent timestamps, where the sum of streams is defined as
132    /// a stream of point-wise sums of their elements: `integral[i,j] =
133    /// sum(input[k,j]), k<=i`, where `stream[i,j]` is the value of `stream`
134    /// at time `[i,j]`, `i` is the parent timestamp, and `j` is the child
135    /// timestamp.
136    ///
137    /// Yields the sum element-by-element as the input stream is fed to the
138    /// integral.
139    ///
140    /// # Examples
141    ///
142    /// Input stream (one row per parent timestamps):
143    ///
144    /// ```text
145    /// 1 2 3 4
146    /// 1 1 1 1 1
147    /// 2 2 2 0 0
148    /// ```
149    ///
150    /// Integral:
151    ///
152    /// ```text
153    /// 1 2 3 4
154    /// 2 3 4 5 1
155    /// 4 5 6 5 1
156    /// ```
157    #[track_caller]
158    pub fn integrate_nested(&self) -> Stream<C, D> {
159        self.circuit()
160            .cache_get_or_insert_with(NestedIntegralId::new(self.stream_id()), || {
161                self.circuit().region("integrate_nested", || {
162                    let feedback = DelayedNestedFeedback::new(self.circuit());
163                    let integral = self.circuit().add_binary_operator_with_preference(
164                        Plus::new(),
165                        (
166                            feedback.stream(),
167                            OwnershipPreference::STRONGLY_PREFER_OWNED,
168                        ),
169                        (self, OwnershipPreference::PREFER_OWNED),
170                    );
171                    feedback.connect(&integral);
172                    integral
173                })
174            })
175            .clone()
176    }
177}
178
179impl<C, T, K, V, R, B> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>>
180where
181    C: Clone + 'static,
182    T: Timestamp,
183    K: DBData + Erase<B::Key>,
184    V: DBData + Erase<B::Val>,
185    R: DBWeight + Erase<B::R>,
186    B: DynIndexedZSet + Checkpoint,
187{
188    /// Integrate the input stream, updating the output once per clock tick.
189    pub fn accumulate_integrate(&self) -> Stream<ChildCircuit<C, T>, TypedBatch<K, V, R, B>> {
190        self.circuit()
191            .non_incremental(self, |_child_circuit, stream| Ok(stream.integrate()))
192            .unwrap()
193    }
194}
195
196#[cfg(test)]
197mod test {
198    use crate::{
199        Circuit, RootCircuit, ZWeight,
200        algebra::HasZero,
201        monitor::TraceMonitor,
202        operator::{DelayedFeedback, Generator},
203        typed_batch::OrdZSet,
204        utils::Tup2,
205        zset,
206    };
207
208    #[test]
209    fn scalar_integrate() {
210        let circuit = RootCircuit::build(move |circuit| {
211            let source = circuit.add_source(Generator::new(|| 1));
212            let mut counter = 0;
213            source.integrate().inspect(move |n| {
214                counter += 1;
215                assert_eq!(*n, counter);
216            });
217            Ok(())
218        })
219        .unwrap()
220        .0;
221
222        for _ in 0..100 {
223            circuit.transaction().unwrap();
224        }
225    }
226
227    #[test]
228    fn zset_integrate() {
229        let circuit = RootCircuit::build(move |circuit| {
230            let mut counter1: u64 = 0;
231            let mut s = <OrdZSet<u64>>::zero();
232            let source = circuit.add_source(Generator::new(move || {
233                let res = s.clone();
234                s = s.merge(&zset! { counter1 => 1});
235                counter1 += 1;
236                res
237            }));
238
239            let integral = source.integrate();
240            let mut counter2 = 0;
241            integral.inspect(move |s| {
242                let mut batch = Vec::with_capacity(counter2);
243                for i in 0..counter2 {
244                    batch.push(Tup2(Tup2(i as u64, ()), (counter2 - i) as ZWeight));
245                }
246                assert_eq!(s, &<OrdZSet<_>>::from_tuples((), batch));
247                counter2 += 1;
248            });
249            let mut counter3 = 0;
250            integral.delay().inspect(move |s| {
251                let mut batch = Vec::with_capacity(counter2);
252                for i in 1..counter3 {
253                    batch.push(Tup2(Tup2((i - 1) as u64, ()), (counter3 - i) as ZWeight));
254                }
255                assert_eq!(s, &<OrdZSet<_>>::from_tuples((), batch));
256                counter3 += 1;
257            });
258            Ok(())
259        })
260        .unwrap()
261        .0;
262
263        for _ in 0..100 {
264            circuit.transaction().unwrap();
265        }
266    }
267
268    /// ```text
269    ///            ┌───────────────────────────────────────────────────────────────────────────────────┐
270    ///            │                                                                                   │
271    ///            │                           3,2,1,0,0,0                     3,2,1,0,                │
272    ///            │                           4,3,2,1,0,0                     7,5,3,1,0,              │
273    ///            │                    ┌───┐  2,1,0,0,0,0                     9,6,3,1,0,              │
274    ///  3,4,2,5   │                    │   │  5,4,3,2,1,0                     14,10,6,3,1,0           │ 6,16,19,34
275    /// ───────────┼──►delta0──────────►│ + ├──────────┬─────►integrate_nested───────────────integrate─┼─────────────►
276    ///            │          3,0,0,0,0 │   │          │                                               │
277    ///            │          4,0,0,0,0 └───┘          ▼                                               │
278    ///            │          2,0,0,0,0   ▲    ┌──┐   ┌───┐                                            │
279    ///            │          5,0,0,0,0   └────┤-1│◄──|z-1|                                            │
280    ///            │                           └──┘   └───┘                                            │
281    ///            │                                                                                   │
282    ///            └───────────────────────────────────────────────────────────────────────────────────┘
283    /// ```
284    #[test]
285    fn scalar_integrate_nested() {
286        let circuit = RootCircuit::build(move |circuit| {
287            TraceMonitor::new_panic_on_error().attach(circuit, "monitor");
288
289            let mut input = vec![3, 4, 2, 5].into_iter();
290
291            let mut expected_counters =
292                vec![3, 2, 1, 0, 4, 3, 2, 1, 0, 2, 1, 0, 0, 0, 5, 4, 3, 2, 1, 0].into_iter();
293            let mut expected_integrals =
294                vec![3, 2, 1, 0, 7, 5, 3, 1, 0, 9, 6, 3, 1, 0, 14, 10, 6, 3, 1, 0].into_iter();
295            let mut expected_outer_integrals = vec![6, 16, 19, 34].into_iter();
296
297            let source = circuit.add_source(Generator::new(move || input.next().unwrap()));
298            let integral = circuit
299                .iterate_with_condition(|child| {
300                    let source = source.delta0(child);
301                    let feedback = DelayedFeedback::new(child);
302                    let plus =
303                        source.plus(&feedback.stream().apply(|&n| if n > 0 { n - 1 } else { n }));
304                    plus.inspect(move |n| assert_eq!(*n, expected_counters.next().unwrap()));
305                    feedback.connect(&plus);
306                    let integral = plus.integrate_nested();
307                    integral.inspect(move |n| assert_eq!(*n, expected_integrals.next().unwrap()));
308                    Ok((
309                        integral.condition(|n| *n == 0),
310                        integral.apply(|rc| *rc).integrate().export(),
311                    ))
312                })
313                .unwrap();
314            integral.inspect(move |n| assert_eq!(*n, expected_outer_integrals.next().unwrap()));
315            Ok(())
316        })
317        .unwrap()
318        .0;
319
320        for _ in 0..4 {
321            circuit.transaction().unwrap();
322        }
323    }
324}