Skip to main content

differential_dataflow/operators/
iterate.rs

1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34
35use timely::Container;
36use timely::progress::Timestamp;
37use timely::order::Product;
38
39use timely::dataflow::*;
40use timely::dataflow::scopes::child::Iterative;
41use timely::dataflow::operators::{Feedback, ConnectLoop};
42use timely::dataflow::operators::feedback::Handle;
43
44use crate::{Data, VecCollection, Collection};
45use crate::difference::{Semigroup, Abelian};
46use crate::lattice::Lattice;
47
48/// An extension trait for the `iterate` method.
49pub trait Iterate<G: Scope<Timestamp: Lattice>, D: Data, R: Semigroup> {
50    /// Iteratively apply `logic` to the source collection until convergence.
51    ///
52    /// Importantly, this method does not automatically consolidate results.
53    /// It may be important to conclude the closure you supply with `consolidate()` to ensure that
54    /// logically empty collections that contain cancelling records do not result in non-termination.
55    /// Operators like `reduce`, `distinct`, and `count` also perform consolidation, and are safe to conclude with.
56    ///
57    /// The closure is also passed a copy of the inner scope, to facilitate importing external collections.
58    /// It can also be acquired by calling `.scope()` on the closure's collection argument, but the code
59    /// can be awkward to write fluently.
60    ///
61    /// # Examples
62    ///
63    /// ```
64    /// use differential_dataflow::input::Input;
65    /// use differential_dataflow::operators::Iterate;
66    ///
67    /// ::timely::example(|scope| {
68    ///
69    ///     scope.new_collection_from(1 .. 10u32).1
70    ///          .iterate(|_scope, values| {
71    ///              values.map(|x| if x % 2 == 0 { x/2 } else { x })
72    ///                    .consolidate()
73    ///          });
74    /// });
75    /// ```
76    fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>
77    where
78        for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>;
79}
80
81impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Abelian+'static> Iterate<G, D, R> for VecCollection<G, D, R> {
82    fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>
83    where
84        for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>,
85    {
86        self.inner.scope().scoped("Iterate", |subgraph| {
87            // create a new variable, apply logic, bind variable, return.
88            //
89            // this could be much more succinct if we returned the collection
90            // wrapped by `variable`, but it also results in substantially more
91            // diffs produced; `result` is post-consolidation, and means fewer
92            // records are yielded out of the loop.
93            let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
94            let result = logic(subgraph.clone(), collection);
95            variable.set(result.clone());
96            result.leave()
97        })
98    }
99}
100
101impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
102    fn iterate<F>(mut self, logic: F) -> VecCollection<G, D, R>
103    where
104        for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>)->VecCollection<Iterative<'a, G, u64>, D, R>,
105    {
106        self.scoped("Iterate", |subgraph| {
107                // create a new variable, apply logic, bind variable, return.
108                //
109                // this could be much more succinct if we returned the collection
110                // wrapped by `variable`, but it also results in substantially more
111                // diffs produced; `result` is post-consolidation, and means fewer
112                // records are yielded out of the loop.
113                let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
114                let result = logic(subgraph.clone(), collection);
115                variable.set(result.clone());
116                result.leave()
117            }
118        )
119    }
120}
121
122/// A recursively defined collection.
123///
124/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
125/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
126/// collections evolve simultaneously.
127///
128/// # Examples
129///
130/// The following example is equivalent to the example for the `Iterate` trait.
131///
132/// ```
133/// use timely::order::Product;
134/// use timely::dataflow::Scope;
135///
136/// use differential_dataflow::input::Input;
137/// use differential_dataflow::operators::iterate::Variable;
138///
139/// ::timely::example(|scope| {
140///
141///     let numbers = scope.new_collection_from(1 .. 10u32).1;
142///
143///     scope.iterative::<u64,_,_>(|nested| {
144///         let summary = Product::new(Default::default(), 1);
145///         let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
146///         let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x })
147///                                .consolidate();
148///         variable.set(result.clone());
149///         result.leave()
150///     });
151/// })
152/// ```
153///
154/// Variables support iterative patterns that can be both more flexible, and more efficient.
155///
156/// Mutual recursion is when one defines multiple variables in the same iterative context,
157/// and their definitions are not independent. For example, odd numbers and even numbers
158/// can be determined from each other, iteratively.
159/// ```
160/// use timely::order::Product;
161/// use timely::dataflow::Scope;
162///
163/// use differential_dataflow::input::Input;
164/// use differential_dataflow::operators::iterate::Variable;
165///
166/// ::timely::example(|scope| {
167///
168///     let numbers = scope.new_collection_from(10 .. 20u32).1;
169///
170///     scope.iterative::<u64,_,_>(|nested| {
171///         let summary = Product::new(Default::default(), 1);
172///         let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary);
173///         let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary);
174///         odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct());
175///         even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct());
176///     });
177/// })
178/// ```
179///
180/// Direct construction can be more efficient than `iterate` when you know a way to directly
181/// determine the changes to make to the initial collection, rather than simply adding that
182/// collection, running your intended logic, and then subtracting the collection.
183///
184/// An an example, the logic in `identifiers.rs` looks for hash collisions, and tweaks the salt
185/// for all but one element in each group of collisions. Most elements do not collide, and we
186/// we don't need to circulate the non-colliding elements to confirm that they subtract away.
187/// By iteratively developing a variable of the *edits* to the input, we can produce and circulate
188/// a smaller volume of updates. This can be especially impactful when the initial collection is
189/// large, and the edits to perform are relatively smaller.
190pub struct Variable<G, C>
191where
192    G: Scope<Timestamp: Lattice>,
193    C: Container,
194{
195    feedback: Handle<G, C>,
196    source: Option<Collection<G, C>>,
197    step: <G::Timestamp as Timestamp>::Summary,
198}
199
200/// A `Variable` specialized to a vector container of update triples (data, time, diff).
201pub type VecVariable<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
202
203impl<G, C: Container> Variable<G, C>
204where
205    G: Scope<Timestamp: Lattice>,
206    C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
207{
208    /// Creates a new initially empty `Variable` and its associated `Collection`.
209    ///
210    /// The collection should be used, along with other potentially recursive collections,
211    /// to define a output collection to which the variable is then `set`.
212    /// In an iterative context, each collection starts empty and are repeatedly updated by
213    /// the logic used to produce the collection their variable is bound to. This process
214    /// continues until no changes occur, at which point we have reached a fixed point (or
215    /// the range of timestamps have been exhausted). Calling `leave()` on any collection
216    /// will produce its fixed point in the outer scope.
217    ///
218    /// In a non-iterative scope the mechanics are the same, but the interpretation varies.
219    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) {
220        let (feedback, updates) = scope.feedback(step.clone());
221        let collection = Collection::<G, C>::new(updates);
222        (Self { feedback, source: None, step }, collection)
223    }
224
225    /// Creates a new `Variable` and its associated `Collection`, initially `source`.
226    ///
227    /// This method is a short-cut for a pattern that one can write manually with `new()`,
228    /// but which is easy enough to get wrong that the help is valuable.
229    ///
230    /// This pattern uses a variable `x` to develop `x = logic(x + source) - source`,
231    /// which finds a fixed point `x` that satisfies `x + source = logic(x + source)`.
232    /// The fixed point equals the repeated application of `logic` to `source` plus the
233    ///
234    /// To implement the pattern one would create a new initially empty variable with `new()`,
235    /// then concatenate `source` into that collection, and use it as `logic` dictates.
236    /// Just before the variable is set to the result collection, `source` is subtracted.
237    ///
238    /// If using this pattern manually, it is important to bear in mind that the collection
239    /// that result from `logic` converges to its fixed point, but that once `source` is
240    /// subtracted the collection converges to this limit minus `source`, a collection that
241    /// may have records that accumulate to negative multiplicities, and for which the model
242    /// of them as "data sets" may break down. Be careful when applying non-linear operations
243    /// like `reduce` that they make sense when updates may have non-positive differences.
244    ///
245    /// Finally, implementing this pattern manually has the ability to more directly implement
246    /// the logic `x = logic(x + source) - source`. If there is a different mechanism than just
247    /// adding the source, doing the logic, then subtracting the source, it is appropriate to do.
248    /// For example, if the logic modifies a few records it is possible to produce this update
249    /// directly without using the backstop implementation this method provides.
250    pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) where C: Clone + crate::collection::containers::Negate {
251        let (feedback, updates) = source.inner.scope().feedback(step.clone());
252        let collection = Collection::<G, C>::new(updates).concat(source.clone());
253        (Variable { feedback, source: Some(source.negate()), step }, collection)
254    }
255
256    /// Set the definition of the `Variable` to a collection.
257    ///
258    /// This method binds the `Variable` to be equal to the supplied collection,
259    /// which may be recursively defined in terms of the variable itself.
260    pub fn set(mut self, mut result: Collection<G, C>) {
261        if let Some(source) = self.source.take() {
262            result = result.concat(source);
263        }
264        result
265            .results_in(self.step)
266            .inner
267            .connect_loop(self.feedback);
268    }
269}