Skip to main content

differential_dataflow/operators/
iterate.rs

1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34
35use timely::Container;
36use timely::progress::Timestamp;
37use timely::order::Product;
38
39use timely::dataflow::*;
40use timely::dataflow::scope::Iterative;
41use timely::dataflow::operators::{Feedback, ConnectLoop};
42use timely::dataflow::operators::feedback::Handle;
43
44use crate::{Data, VecCollection, Collection};
45use crate::difference::{Semigroup, Abelian};
46use crate::lattice::Lattice;
47
48/// An extension trait for the `iterate` method.
49pub trait Iterate<'scope, T: Timestamp + Lattice, D: Data, R: Semigroup> {
50    /// Iteratively apply `logic` to the source collection until convergence.
51    ///
52    /// Importantly, this method does not automatically consolidate results.
53    /// It may be important to conclude the closure you supply with `consolidate()` to ensure that
54    /// logically empty collections that contain cancelling records do not result in non-termination.
55    /// Operators like `reduce`, `distinct`, and `count` also perform consolidation, and are safe to conclude with.
56    ///
57    /// The closure is also passed a copy of the inner scope, to facilitate importing external collections.
58    /// It can also be acquired by calling `.scope()` on the closure's collection argument, but the code
59    /// can be awkward to write fluently.
60    ///
61    /// # Examples
62    ///
63    /// ```
64    /// use differential_dataflow::input::Input;
65    /// use differential_dataflow::operators::Iterate;
66    ///
67    /// ::timely::example(|scope| {
68    ///
69    ///     scope.new_collection_from(1 .. 10u32).1
70    ///          .iterate(|_scope, values| {
71    ///              values.map(|x| if x % 2 == 0 { x/2 } else { x })
72    ///                    .consolidate()
73    ///          });
74    /// });
75    /// ```
76    fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
77    where
78        for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>;
79}
80
81impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Abelian+'static> Iterate<'scope, T, D, R> for VecCollection<'scope, T, D, R> {
82    fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
83    where
84        for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
85    {
86        let outer = self.inner.scope();
87        outer.scoped("Iterate", |subgraph| {
88            // create a new variable, apply logic, bind variable, return.
89            //
90            // this could be much more succinct if we returned the collection
91            // wrapped by `variable`, but it also results in substantially more
92            // diffs produced; `result` is post-consolidation, and means fewer
93            // records are yielded out of the loop.
94            let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
95            let result = logic(subgraph, collection);
96            variable.set(result.clone());
97            result.leave(outer)
98        })
99    }
100}
101
102impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<'scope, T, D, R> for Scope<'scope, T> {
103    fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
104    where
105        for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
106    {
107        let outer = self;
108        self.scoped("Iterate", |subgraph| {
109                // create a new variable, apply logic, bind variable, return.
110                //
111                // this could be much more succinct if we returned the collection
112                // wrapped by `variable`, but it also results in substantially more
113                // diffs produced; `result` is post-consolidation, and means fewer
114                // records are yielded out of the loop.
115                let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
116                let result = logic(subgraph, collection);
117                variable.set(result.clone());
118                result.leave(outer)
119            }
120        )
121    }
122}
123
124/// A recursively defined collection.
125///
126/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
127/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
128/// collections evolve simultaneously.
129///
130/// # Examples
131///
132/// The following example is equivalent to the example for the `Iterate` trait.
133///
134/// ```
135/// use timely::order::Product;
136/// use timely::dataflow::Scope;
137///
138/// use differential_dataflow::input::Input;
139/// use differential_dataflow::operators::iterate::Variable;
140///
141/// ::timely::example(|scope| {
142///
143///     let numbers = scope.new_collection_from(1 .. 10u32).1;
144///
145///     scope.iterative::<u64,_,_>(|nested| {
146///         let summary = Product::new(Default::default(), 1);
147///         let (variable, collection) = Variable::new_from(numbers.enter(nested), summary);
148///         let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x })
149///                                .consolidate();
150///         variable.set(result.clone());
151///         result.leave(scope)
152///     });
153/// })
154/// ```
155///
156/// Variables support iterative patterns that can be both more flexible, and more efficient.
157///
158/// Mutual recursion is when one defines multiple variables in the same iterative context,
159/// and their definitions are not independent. For example, odd numbers and even numbers
160/// can be determined from each other, iteratively.
161/// ```
162/// use timely::order::Product;
163/// use timely::dataflow::Scope;
164///
165/// use differential_dataflow::input::Input;
166/// use differential_dataflow::operators::iterate::Variable;
167///
168/// ::timely::example(|scope| {
169///
170///     let numbers = scope.new_collection_from(10 .. 20u32).1;
171///
172///     scope.iterative::<u64,_,_>(|nested| {
173///         let summary = Product::new(Default::default(), 1);
174///         let (even_v, even) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 0), summary);
175///         let (odds_v, odds) = Variable::new_from(numbers.clone().enter(nested).filter(|x| x % 2 == 1), summary);
176///         odds_v.set(even.clone().filter(|x| *x > 0).map(|x| x-1).concat(odds.clone()).distinct());
177///         even_v.set(odds.clone().filter(|x| *x > 0).map(|x| x-1).concat(even.clone()).distinct());
178///     });
179/// })
180/// ```
181///
182/// Direct construction can be more efficient than `iterate` when you know a way to directly
183/// determine the changes to make to the initial collection, rather than simply adding that
184/// collection, running your intended logic, and then subtracting the collection.
185///
186/// An an example, the logic in `identifiers.rs` looks for hash collisions, and tweaks the salt
187/// for all but one element in each group of collisions. Most elements do not collide, and we
188/// we don't need to circulate the non-colliding elements to confirm that they subtract away.
189/// By iteratively developing a variable of the *edits* to the input, we can produce and circulate
190/// a smaller volume of updates. This can be especially impactful when the initial collection is
191/// large, and the edits to perform are relatively smaller.
192pub struct Variable<'scope, T, C>
193where
194    T: Timestamp + Lattice,
195    C: Container,
196{
197    feedback: Handle<'scope, T, C>,
198    source: Option<Collection<'scope, T, C>>,
199    step: T::Summary,
200}
201
202/// A `Variable` specialized to a vector container of update triples (data, time, diff).
203pub type VecVariable<'scope, T, D, R> = Variable<'scope, T, Vec<(D, T, R)>>;
204
205impl<'scope, T, C: Container> Variable<'scope, T, C>
206where
207    T: Timestamp + Lattice,
208    C: crate::collection::containers::ResultsIn<T::Summary>,
209{
210    /// Creates a new initially empty `Variable` and its associated `Collection`.
211    ///
212    /// The collection should be used, along with other potentially recursive collections,
213    /// to define a output collection to which the variable is then `set`.
214    /// In an iterative context, each collection starts empty and are repeatedly updated by
215    /// the logic used to produce the collection their variable is bound to. This process
216    /// continues until no changes occur, at which point we have reached a fixed point (or
217    /// the range of timestamps have been exhausted). Calling `leave()` on any collection
218    /// will produce its fixed point in the outer scope.
219    ///
220    /// In a non-iterative scope the mechanics are the same, but the interpretation varies.
221    pub fn new(scope: Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) {
222        let (feedback, updates) = scope.feedback(step.clone());
223        let collection = Collection::<T, C>::new(updates);
224        (Self { feedback, source: None, step }, collection)
225    }
226
227    /// Creates a new `Variable` and its associated `Collection`, initially `source`.
228    ///
229    /// This method is a short-cut for a pattern that one can write manually with `new()`,
230    /// but which is easy enough to get wrong that the help is valuable.
231    ///
232    /// This pattern uses a variable `x` to develop `x = logic(x + source) - source`,
233    /// which finds a fixed point `x` that satisfies `x + source = logic(x + source)`.
234    /// The fixed point equals the repeated application of `logic` to `source` plus the
235    ///
236    /// To implement the pattern one would create a new initially empty variable with `new()`,
237    /// then concatenate `source` into that collection, and use it as `logic` dictates.
238    /// Just before the variable is set to the result collection, `source` is subtracted.
239    ///
240    /// If using this pattern manually, it is important to bear in mind that the collection
241    /// that result from `logic` converges to its fixed point, but that once `source` is
242    /// subtracted the collection converges to this limit minus `source`, a collection that
243    /// may have records that accumulate to negative multiplicities, and for which the model
244    /// of them as "data sets" may break down. Be careful when applying non-linear operations
245    /// like `reduce` that they make sense when updates may have non-positive differences.
246    ///
247    /// Finally, implementing this pattern manually has the ability to more directly implement
248    /// the logic `x = logic(x + source) - source`. If there is a different mechanism than just
249    /// adding the source, doing the logic, then subtracting the source, it is appropriate to do.
250    /// For example, if the logic modifies a few records it is possible to produce this update
251    /// directly without using the backstop implementation this method provides.
252    pub fn new_from(source: Collection<'scope, T, C>, step: T::Summary) -> (Self, Collection<'scope, T, C>) where C: Clone + crate::collection::containers::Negate {
253        let (feedback, updates) = source.inner.scope().feedback(step.clone());
254        let collection = Collection::<T, C>::new(updates).concat(source.clone());
255        (Variable { feedback, source: Some(source.negate()), step }, collection)
256    }
257
258    /// Set the definition of the `Variable` to a collection.
259    ///
260    /// This method binds the `Variable` to be equal to the supplied collection,
261    /// which may be recursively defined in terms of the variable itself.
262    pub fn set(mut self, mut result: Collection<'scope, T, C>) {
263        if let Some(source) = self.source.take() {
264            result = result.concat(source);
265        }
266        result
267            .results_in(self.step)
268            .inner
269            .connect_loop(self.feedback);
270    }
271}