Skip to main content

palimpsest_dataflow/operators/
iterate.rs

1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34use std::ops::Deref;
35
36use timely::order::Product;
37use timely::progress::Timestamp;
38use timely::Container;
39
40use timely::dataflow::operators::feedback::Handle;
41use timely::dataflow::operators::{ConnectLoop, Feedback};
42use timely::dataflow::scopes::child::Iterative;
43use timely::dataflow::*;
44
45use crate::difference::{Abelian, Semigroup};
46use crate::lattice::Lattice;
47use crate::{Collection, Data, VecCollection};
48
49/// An extension trait for the `iterate` method.
50pub trait Iterate<G: Scope<Timestamp: Lattice>, D: Data, R: Semigroup> {
51    /// Iteratively apply `logic` to the source collection until convergence.
52    ///
53    /// Importantly, this method does not automatically consolidate results.
54    /// It may be important to conclude with `consolidate()` to ensure that
55    /// logically empty collections that contain cancelling records do not
56    /// result in non-termination. Operators like `reduce`, `distinct`, and
57    /// `count` also perform consolidation, and are safe to conclude with.
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use palimpsest_dataflow::input::Input;
63    /// use palimpsest_dataflow::operators::Iterate;
64    ///
65    /// ::timely::example(|scope| {
66    ///
67    ///     scope.new_collection_from(1 .. 10u32).1
68    ///          .iterate(|values| {
69    ///              values.map(|x| if x % 2 == 0 { x/2 } else { x })
70    ///                    .consolidate()
71    ///          });
72    /// });
73    /// ```
74    fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
75    where
76        for<'a> F: FnOnce(
77            &VecCollection<Iterative<'a, G, u64>, D, R>,
78        ) -> VecCollection<Iterative<'a, G, u64>, D, R>;
79}
80
81impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R>
82    for VecCollection<G, D, R>
83{
84    fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
85    where
86        for<'a> F: FnOnce(
87            &VecCollection<Iterative<'a, G, u64>, D, R>,
88        ) -> VecCollection<Iterative<'a, G, u64>, D, R>,
89    {
90        self.inner.scope().scoped("Iterate", |subgraph| {
91            // create a new variable, apply logic, bind variable, return.
92            //
93            // this could be much more succinct if we returned the collection
94            // wrapped by `variable`, but it also results in substantially more
95            // diffs produced; `result` is post-consolidation, and means fewer
96            // records are yielded out of the loop.
97            let variable =
98                Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
99            let result = logic(&variable);
100            variable.set(&result);
101            result.leave()
102        })
103    }
104}
105
106impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Semigroup + 'static> Iterate<G, D, R>
107    for G
108{
109    fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
110    where
111        for<'a> F: FnOnce(
112            &VecCollection<Iterative<'a, G, u64>, D, R>,
113        ) -> VecCollection<Iterative<'a, G, u64>, D, R>,
114    {
115        // TODO: This makes me think we have the wrong ownership pattern here.
116        let mut clone = self.clone();
117        clone.scoped("Iterate", |subgraph| {
118            // create a new variable, apply logic, bind variable, return.
119            //
120            // this could be much more succinct if we returned the collection
121            // wrapped by `variable`, but it also results in substantially more
122            // diffs produced; `result` is post-consolidation, and means fewer
123            // records are yielded out of the loop.
124            let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
125            let result = logic(&variable);
126            variable.set(&result);
127            result.leave()
128        })
129    }
130}
131
132/// A recursively defined collection.
133///
134/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
135/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
136/// collections evolve simultaneously.
137///
138/// # Examples
139///
140/// The following example is equivalent to the example for the `Iterate` trait.
141///
142/// ```
143/// use timely::order::Product;
144/// use timely::dataflow::Scope;
145///
146/// use palimpsest_dataflow::input::Input;
147/// use palimpsest_dataflow::operators::iterate::Variable;
148///
149/// ::timely::example(|scope| {
150///
151///     let numbers = scope.new_collection_from(1 .. 10u32).1;
152///
153///     scope.iterative::<u64,_,_>(|nested| {
154///         let summary = Product::new(Default::default(), 1);
155///         let variable = Variable::new_from(numbers.enter(nested), summary);
156///         let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
157///                              .consolidate();
158///         variable.set(&result)
159///                 .leave()
160///     });
161/// })
162/// ```
163pub struct Variable<G, C>
164where
165    G: Scope<Timestamp: Lattice>,
166    C: Container,
167{
168    collection: Collection<G, C>,
169    feedback: Handle<G, C>,
170    source: Option<Collection<G, C>>,
171    step: <G::Timestamp as Timestamp>::Summary,
172}
173
174/// A `Variable` specialized to a vector container of update triples (data, time, diff).
175pub type VecVariable<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
176
177impl<G, C: Container> Variable<G, C>
178where
179    G: Scope<Timestamp: Lattice>,
180    C: crate::collection::containers::Negate
181        + crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
182{
183    /// Creates a new initially empty `Variable`.
184    ///
185    /// This method produces a simpler dataflow graph than `new_from`, and should
186    /// be used whenever the variable has an empty input.
187    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
188        let (feedback, updates) = scope.feedback(step.clone());
189        let collection = Collection::<G, C>::new(updates);
190        Self {
191            collection,
192            feedback,
193            source: None,
194            step,
195        }
196    }
197
198    /// Creates a new `Variable` from a supplied `source` stream.
199    pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
200        let (feedback, updates) = source.inner.scope().feedback(step.clone());
201        let collection = Collection::<G, C>::new(updates).concat(&source);
202        Variable {
203            collection,
204            feedback,
205            source: Some(source),
206            step,
207        }
208    }
209
210    /// Set the definition of the `Variable` to a collection.
211    ///
212    /// This method binds the `Variable` to be equal to the supplied collection,
213    /// which may be recursively defined in terms of the variable itself.
214    pub fn set(self, result: &Collection<G, C>) -> Collection<G, C> {
215        let mut in_result = result.clone();
216        if let Some(source) = &self.source {
217            in_result = in_result.concat(&source.negate());
218        }
219        self.set_concat(&in_result)
220    }
221
222    /// Set the definition of the `Variable` to a collection concatenated to `self`.
223    ///
224    /// This method is a specialization of `set` which has the effect of concatenating
225    /// `result` and `self` before calling `set`. This method avoids some dataflow
226    /// complexity related to retracting the initial input, and will do less work in
227    /// that case.
228    ///
229    /// This behavior can also be achieved by using `new` to create an empty initial
230    /// collection, and then using `self.set(self.concat(result))`.
231    pub fn set_concat(self, result: &Collection<G, C>) -> Collection<G, C> {
232        let step = self.step;
233        result.results_in(step).inner.connect_loop(self.feedback);
234
235        self.collection
236    }
237}
238
239impl<G: Scope<Timestamp: Lattice>, C: Container> Deref for Variable<G, C> {
240    type Target = Collection<G, C>;
241    fn deref(&self) -> &Self::Target {
242        &self.collection
243    }
244}
245
246/// A recursively defined collection that only "grows".
247///
248/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
249/// that do not implement `Abelian` and only implement `Semigroup`. This means
250/// that it can be used in settings where the difference type does not support
251/// negation.
252pub struct SemigroupVariable<G, C>
253where
254    G: Scope<Timestamp: Lattice>,
255    C: Container,
256{
257    collection: Collection<G, C>,
258    feedback: Handle<G, C>,
259    step: <G::Timestamp as Timestamp>::Summary,
260}
261
262impl<G, C: Container> SemigroupVariable<G, C>
263where
264    G: Scope<Timestamp: Lattice>,
265    C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
266{
267    /// Creates a new initially empty `SemigroupVariable`.
268    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
269        let (feedback, updates) = scope.feedback(step.clone());
270        let collection = Collection::<G, C>::new(updates);
271        SemigroupVariable {
272            collection,
273            feedback,
274            step,
275        }
276    }
277
278    /// Adds a new source of data to `self`.
279    pub fn set(self, result: &Collection<G, C>) -> Collection<G, C> {
280        let step = self.step;
281        result.results_in(step).inner.connect_loop(self.feedback);
282
283        self.collection
284    }
285}
286
287impl<G: Scope, C: Container> Deref for SemigroupVariable<G, C>
288where
289    G::Timestamp: Lattice,
290{
291    type Target = Collection<G, C>;
292    fn deref(&self) -> &Self::Target {
293        &self.collection
294    }
295}